Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/task_worker_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/task_worker_pool.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/AgentService_types.h>
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/MasterService_types.h>
26
#include <gen_cpp/Status_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <unistd.h>
29
30
#include <algorithm>
31
// IWYU pragma: no_include <bits/chrono.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
34
#include <atomic>
35
#include <chrono> // IWYU pragma: keep
36
#include <ctime>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <shared_mutex>
41
#include <sstream>
42
#include <string>
43
#include <thread>
44
#include <type_traits>
45
#include <utility>
46
#include <vector>
47
48
#include "agent/utils.h"
49
#include "cloud/cloud_delete_task.h"
50
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
51
#include "cloud/cloud_schema_change_job.h"
52
#include "cloud/cloud_snapshot_loader.h"
53
#include "cloud/cloud_snapshot_mgr.h"
54
#include "cloud/cloud_tablet_mgr.h"
55
#include "cloud/config.h"
56
#include "common/config.h"
57
#include "common/logging.h"
58
#include "common/metrics/doris_metrics.h"
59
#include "common/status.h"
60
#include "io/fs/file_system.h"
61
#include "io/fs/hdfs_file_system.h"
62
#include "io/fs/local_file_system.h"
63
#include "io/fs/obj_storage_client.h"
64
#include "io/fs/path.h"
65
#include "io/fs/remote_file_system.h"
66
#include "io/fs/s3_file_system.h"
67
#include "runtime/exec_env.h"
68
#include "runtime/fragment_mgr.h"
69
#include "runtime/index_policy/index_policy_mgr.h"
70
#include "runtime/memory/global_memory_arbitrator.h"
71
#include "runtime/snapshot_loader.h"
72
#include "runtime/user_function_cache.h"
73
#include "service/backend_options.h"
74
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
75
#include "storage/data_dir.h"
76
#include "storage/olap_common.h"
77
#include "storage/rowset/rowset_meta.h"
78
#include "storage/snapshot/snapshot_manager.h"
79
#include "storage/storage_engine.h"
80
#include "storage/storage_policy.h"
81
#include "storage/tablet/tablet.h"
82
#include "storage/tablet/tablet_manager.h"
83
#include "storage/tablet/tablet_meta.h"
84
#include "storage/tablet/tablet_schema.h"
85
#include "storage/task/engine_batch_load_task.h"
86
#include "storage/task/engine_checksum_task.h"
87
#include "storage/task/engine_clone_task.h"
88
#include "storage/task/engine_cloud_index_change_task.h"
89
#include "storage/task/engine_index_change_task.h"
90
#include "storage/task/engine_publish_version_task.h"
91
#include "storage/task/engine_storage_migration_task.h"
92
#include "storage/txn/txn_manager.h"
93
#include "storage/utils.h"
94
#include "util/brpc_client_cache.h"
95
#include "util/debug_points.h"
96
#include "util/jni-util.h"
97
#include "util/mem_info.h"
98
#include "util/random.h"
99
#include "util/s3_util.h"
100
#include "util/stopwatch.hpp"
101
#include "util/threadpool.h"
102
#include "util/time.h"
103
#include "util/trace.h"
104
105
namespace doris {
106
#include "common/compile_check_begin.h"
107
using namespace ErrorCode;
108
109
namespace {
110
111
std::mutex s_task_signatures_mtx;
112
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;
113
114
std::atomic_ulong s_report_version(time(nullptr) * 100000);
115
116
12.8k
void increase_report_version() {
117
12.8k
    s_report_version.fetch_add(1, std::memory_order_relaxed);
118
12.8k
}
119
120
// FIXME(plat1ko): Paired register and remove task info
121
33.3k
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
122
33.3k
    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
123
33.3k
        task_type == TTaskType::type::PUSH_COOLDOWN_CONF ||
124
33.3k
        task_type == TTaskType::type::COMPACTION) {
125
        // no need to report task of these types
126
11
        return true;
127
11
    }
128
129
33.3k
    if (signature == -1) { // No need to report task with unintialized signature
130
59
        return true;
131
59
    }
132
133
33.3k
    std::lock_guard lock(s_task_signatures_mtx);
134
33.3k
    auto& set = s_task_signatures[task_type];
135
33.3k
    return set.insert(signature).second;
136
33.3k
}
137
138
32.0k
void remove_task_info(const TTaskType::type task_type, int64_t signature) {
139
32.0k
    size_t queue_size;
140
32.0k
    {
141
32.0k
        std::lock_guard lock(s_task_signatures_mtx);
142
32.0k
        auto& set = s_task_signatures[task_type];
143
32.0k
        set.erase(signature);
144
32.0k
        queue_size = set.size();
145
32.0k
    }
146
147
32.0k
    VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
148
0
                << ", queue_size=" << queue_size;
149
32.0k
}
150
151
31.5k
void finish_task(const TFinishTaskRequest& finish_task_request) {
152
    // Return result to FE
153
31.5k
    TMasterResult result;
154
31.5k
    uint32_t try_time = 0;
155
31.5k
    constexpr int TASK_FINISH_MAX_RETRY = 3;
156
31.8k
    while (try_time < TASK_FINISH_MAX_RETRY) {
157
31.8k
        DorisMetrics::instance()->finish_task_requests_total->increment(1);
158
31.8k
        Status client_status =
159
31.8k
                MasterServerClient::instance()->finish_task(finish_task_request, &result);
160
161
32.0k
        if (client_status.ok()) {
162
32.0k
            break;
163
18.4E
        } else {
164
18.4E
            DorisMetrics::instance()->finish_task_requests_failed->increment(1);
165
18.4E
            LOG_WARNING("failed to finish task")
166
18.4E
                    .tag("type", finish_task_request.task_type)
167
18.4E
                    .tag("signature", finish_task_request.signature)
168
18.4E
                    .error(result.status);
169
18.4E
            try_time += 1;
170
18.4E
        }
171
18.4E
        sleep(1);
172
18.4E
    }
173
31.5k
}
174
175
Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id,
176
0
                       const TSchemaHash schema_hash, TTabletInfo* tablet_info) {
177
0
    tablet_info->__set_tablet_id(tablet_id);
178
0
    tablet_info->__set_schema_hash(schema_hash);
179
0
    return engine.tablet_manager()->report_tablet_info(tablet_info);
180
0
}
181
182
691
void random_sleep(int second) {
183
691
    Random rnd(static_cast<uint32_t>(UnixMillis()));
184
691
    sleep(rnd.Uniform(second) + 1);
185
691
}
186
187
void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature,
188
0
                  const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) {
189
0
    Status status;
190
191
0
    std::string_view process_name = "alter tablet";
192
    // Check last schema change status, if failed delete tablet file
193
    // Do not need to adjust delete success or not
194
    // Because if delete failed create rollup will failed
195
0
    TTabletId new_tablet_id = 0;
196
0
    TSchemaHash new_schema_hash = 0;
197
0
    if (status.ok()) {
198
0
        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
199
0
        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
200
0
        auto mem_tracker = MemTrackerLimiter::create_shared(
201
0
                MemTrackerLimiter::Type::SCHEMA_CHANGE,
202
0
                fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
203
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
204
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
205
0
                            engine.memory_limitation_bytes_per_thread_for_schema_change()));
206
0
        SCOPED_ATTACH_TASK(mem_tracker);
207
0
        DorisMetrics::instance()->create_rollup_requests_total->increment(1);
208
0
        Status res = Status::OK();
209
0
        try {
210
0
            LOG_INFO("start {}", process_name)
211
0
                    .tag("signature", agent_task_req.signature)
212
0
                    .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
213
0
                    .tag("new_tablet_id", new_tablet_id)
214
0
                    .tag("mem_limit",
215
0
                         engine.memory_limitation_bytes_per_thread_for_schema_change());
216
0
            SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
217
0
                                std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id
218
0
                                                       ? agent_task_req.alter_tablet_req_v2.job_id
219
0
                                                       : 0));
220
0
            status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
221
0
        } catch (const Exception& e) {
222
0
            status = e.to_status();
223
0
        }
224
0
        if (!status.ok()) {
225
0
            DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
226
0
        }
227
0
    }
228
229
0
    if (status.ok()) {
230
0
        increase_report_version();
231
0
    }
232
233
    // Return result to fe
234
0
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
235
0
    finish_task_request->__set_report_version(s_report_version);
236
0
    finish_task_request->__set_task_type(task_type);
237
0
    finish_task_request->__set_signature(signature);
238
239
0
    std::vector<TTabletInfo> finish_tablet_infos;
240
0
    if (status.ok()) {
241
0
        TTabletInfo tablet_info;
242
0
        status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info);
243
0
        if (status.ok()) {
244
0
            finish_tablet_infos.push_back(tablet_info);
245
0
        }
246
0
    }
247
248
0
    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
249
0
        LOG_WARNING("failed to {}", process_name)
250
0
                .tag("signature", agent_task_req.signature)
251
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
252
0
                .tag("new_tablet_id", new_tablet_id)
253
0
                .error(status);
254
0
    } else {
255
0
        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
256
0
        LOG_INFO("successfully {}", process_name)
257
0
                .tag("signature", agent_task_req.signature)
258
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
259
0
                .tag("new_tablet_id", new_tablet_id);
260
0
    }
261
0
    finish_task_request->__set_task_status(status.to_thrift());
262
0
}
263
264
void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req,
265
                        int64_t signature, const TTaskType::type task_type,
266
10.1k
                        TFinishTaskRequest* finish_task_request) {
267
10.1k
    Status status;
268
269
10.1k
    std::string_view process_name = "alter tablet";
270
    // Check last schema change status, if failed delete tablet file
271
    // Do not need to adjust delete success or not
272
    // Because if delete failed create rollup will failed
273
10.1k
    TTabletId new_tablet_id = 0;
274
10.1k
    new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
275
10.1k
    auto mem_tracker = MemTrackerLimiter::create_shared(
276
10.1k
            MemTrackerLimiter::Type::SCHEMA_CHANGE,
277
10.1k
            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
278
10.1k
                        std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
279
10.1k
                        std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
280
10.1k
                        engine.memory_limitation_bytes_per_thread_for_schema_change()));
281
10.1k
    SCOPED_ATTACH_TASK(mem_tracker);
282
10.1k
    DorisMetrics::instance()->create_rollup_requests_total->increment(1);
283
284
10.1k
    LOG_INFO("start {}", process_name)
285
10.1k
            .tag("signature", agent_task_req.signature)
286
10.1k
            .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
287
10.1k
            .tag("new_tablet_id", new_tablet_id)
288
10.1k
            .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
289
10.1k
    DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
290
10.1k
    CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
291
10.1k
                             agent_task_req.alter_tablet_req_v2.expiration);
292
10.1k
    status = [&]() {
293
10.1k
        HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
294
10.1k
                job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
295
10.1k
                [&](const doris::Exception& ex) {
296
10.1k
                    DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
297
10.1k
                    job.clean_up_on_failure();
298
10.1k
                });
299
9.72k
        return Status::OK();
300
10.1k
    }();
301
302
10.1k
    if (status.ok()) {
303
9.54k
        increase_report_version();
304
9.54k
        LOG_INFO("successfully {}", process_name)
305
9.54k
                .tag("signature", agent_task_req.signature)
306
9.54k
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
307
9.54k
                .tag("new_tablet_id", new_tablet_id);
308
9.54k
    } else {
309
644
        LOG_WARNING("failed to {}", process_name)
310
644
                .tag("signature", agent_task_req.signature)
311
644
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
312
644
                .tag("new_tablet_id", new_tablet_id)
313
644
                .error(status);
314
644
    }
315
316
    // Return result to fe
317
10.1k
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
318
10.1k
    finish_task_request->__set_report_version(s_report_version);
319
10.1k
    finish_task_request->__set_task_type(task_type);
320
10.1k
    finish_task_request->__set_signature(signature);
321
10.1k
    finish_task_request->__set_task_status(status.to_thrift());
322
10.1k
}
323
324
Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req,
325
0
                             TabletSharedPtr& tablet, DataDir** dest_store) {
326
0
    int64_t tablet_id = req.tablet_id;
327
0
    tablet = engine.tablet_manager()->get_tablet(tablet_id);
328
0
    if (tablet == nullptr) {
329
0
        return Status::InternalError("could not find tablet {}", tablet_id);
330
0
    }
331
332
0
    if (req.__isset.data_dir) {
333
        // request specify the data dir
334
0
        *dest_store = engine.get_store(req.data_dir);
335
0
        if (*dest_store == nullptr) {
336
0
            return Status::InternalError("could not find data dir {}", req.data_dir);
337
0
        }
338
0
    } else {
339
        // this is a storage medium
340
        // get data dir by storage medium
341
342
        // judge case when no need to migrate
343
0
        uint32_t count = engine.available_storage_medium_type_count();
344
0
        if (count <= 1) {
345
0
            return Status::InternalError("available storage medium type count is less than 1");
346
0
        }
347
        // check current tablet storage medium
348
0
        TStorageMedium::type storage_medium = req.storage_medium;
349
0
        TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
350
0
        if (src_storage_medium == storage_medium) {
351
0
            return Status::InternalError("tablet is already on specified storage medium {}",
352
0
                                         storage_medium);
353
0
        }
354
        // get a random store of specified storage medium
355
0
        auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium);
356
0
        if (stores.empty()) {
357
0
            return Status::InternalError("failed to get root path for create tablet");
358
0
        }
359
360
0
        *dest_store = stores[0];
361
0
    }
362
0
    if (tablet->data_dir()->path() == (*dest_store)->path()) {
363
0
        LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
364
0
        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
365
0
                                                        tablet->data_dir()->path());
366
0
    }
367
368
    // check local disk capacity
369
0
    int64_t tablet_size = tablet->tablet_local_size();
370
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
371
0
        return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
372
0
                                             (*dest_store)->path(), tablet_size);
373
0
    }
374
0
    return Status::OK();
375
0
}
376
377
// Return `true` if report success
378
bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info,
379
1.34k
                   std::string_view name) {
380
1.34k
    TMasterResult result;
381
1.34k
    Status status = MasterServerClient::instance()->report(request, &result);
382
1.34k
    if (!status.ok()) [[unlikely]] {
383
0
        LOG_WARNING("failed to report {}", name)
384
0
                .tag("host", cluster_info->master_fe_addr.hostname)
385
0
                .tag("port", cluster_info->master_fe_addr.port)
386
0
                .error(status);
387
0
        return false;
388
0
    }
389
390
1.34k
    else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
391
0
        LOG_WARNING("failed to report {}", name)
392
0
                .tag("host", cluster_info->master_fe_addr.hostname)
393
0
                .tag("port", cluster_info->master_fe_addr.port)
394
0
                .error(result.status);
395
0
        return false;
396
0
    }
397
398
1.34k
    return true;
399
1.34k
}
400
401
Status _submit_task(const TAgentTaskRequest& task,
402
33.3k
                    std::function<Status(const TAgentTaskRequest&)> submit_op) {
403
33.3k
    const TTaskType::type task_type = task.task_type;
404
33.3k
    int64_t signature = task.signature;
405
406
33.3k
    std::string type_str;
407
33.3k
    EnumToString(TTaskType, task_type, type_str);
408
18.4E
    VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature;
409
410
33.3k
    if (!register_task_info(task_type, signature)) {
411
1.25k
        LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
412
        // Duplicated task request, just return OK
413
1.25k
        return Status::OK();
414
1.25k
    }
415
416
    // TODO(plat1ko): check task request member
417
418
    // Set the receiving time of task so that we can determine whether it is timed out later
419
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
420
    // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok
421
32.1k
    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
422
32.1k
    auto st = submit_op(task);
423
32.1k
    if (!st.ok()) [[unlikely]] {
424
1
        LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature);
425
1
        return st;
426
1
    }
427
428
32.1k
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
429
32.1k
    return Status::OK();
430
32.1k
}
431
432
bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
433
434
bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX");
435
bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY");
436
bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD");
437
bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD");
438
bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT");
439
bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT");
440
bvar::Adder<uint64_t> MOVE_count("task", "MOVE");
441
bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION");
442
bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY");
443
bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY");
444
bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF");
445
bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE");
446
bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE");
447
bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION");
448
bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK");
449
bvar::Adder<uint64_t> DELETE_count("task", "DELETE");
450
bvar::Adder<uint64_t> PUSH_count("task", "PUSH");
451
bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO");
452
bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
453
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
454
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
455
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
456
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
457
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");
458
459
64.2k
void add_task_count(const TAgentTaskRequest& task, int n) {
460
    // clang-format off
461
64.2k
    switch (task.task_type) {
462
0
    #define ADD_TASK_COUNT(type) \
463
37.4k
    case TTaskType::type:        \
464
74.8k
        type##_count << n;       \
465
37.4k
        return;
466
1.14k
    ADD_TASK_COUNT(ALTER_INVERTED_INDEX)
467
0
    ADD_TASK_COUNT(CHECK_CONSISTENCY)
468
0
    ADD_TASK_COUNT(UPLOAD)
469
0
    ADD_TASK_COUNT(DOWNLOAD)
470
0
    ADD_TASK_COUNT(MAKE_SNAPSHOT)
471
0
    ADD_TASK_COUNT(RELEASE_SNAPSHOT)
472
0
    ADD_TASK_COUNT(MOVE)
473
0
    ADD_TASK_COUNT(COMPACTION)
474
18
    ADD_TASK_COUNT(PUSH_STORAGE_POLICY)
475
32
    ADD_TASK_COUNT(PUSH_INDEX_POLICY)
476
4
    ADD_TASK_COUNT(PUSH_COOLDOWN_CONF)
477
17
    ADD_TASK_COUNT(CREATE)
478
17.8k
    ADD_TASK_COUNT(DROP)
479
54
    ADD_TASK_COUNT(PUBLISH_VERSION)
480
0
    ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
481
0
    ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
482
0
    ADD_TASK_COUNT(CLONE)
483
0
    ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
484
0
    ADD_TASK_COUNT(GC_BINLOG)
485
66
    ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
486
18.2k
    ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
487
0
    #undef ADD_TASK_COUNT
488
6.44k
    case TTaskType::REALTIME_PUSH:
489
6.44k
    case TTaskType::PUSH:
490
6.44k
        if (task.push_req.push_type == TPushType::LOAD_V2) {
491
0
            PUSH_count << n;
492
6.44k
        } else if (task.push_req.push_type == TPushType::DELETE) {
493
6.44k
            DELETE_count << n;
494
6.44k
        }
495
6.44k
        return;
496
20.3k
    case TTaskType::ALTER:
497
20.3k
    {
498
20.3k
        ALTER_count << n;
499
        // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment
500
20.3k
        if (n > 0) {
501
            // only count fragment when task is actually starting
502
10.1k
            doris::g_fragment_executing_count << 1;
503
10.1k
            int64_t now = duration_cast<std::chrono::milliseconds>(
504
10.1k
                                std::chrono::system_clock::now().time_since_epoch())
505
10.1k
                                .count();
506
10.1k
            g_fragment_last_active_time.set_value(now);
507
10.1k
        }
508
20.3k
        return;
509
6.44k
    }
510
0
    default:
511
0
        return;
512
64.2k
    }
513
    // clang-format on
514
64.2k
}
515
516
bvar::Adder<uint64_t> report_task_total("report", "task_total");
517
bvar::Adder<uint64_t> report_task_failed("report", "task_failed");
518
bvar::Adder<uint64_t> report_disk_total("report", "disk_total");
519
bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed");
520
bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total");
521
bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
522
bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total");
523
bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed");
524
525
} // namespace
526
527
TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count,
528
                               std::function<void(const TAgentTaskRequest& task)> callback)
529
148
        : _callback(std::move(callback)) {
530
148
    auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
531
148
                      .set_min_threads(worker_count)
532
148
                      .set_max_threads(worker_count)
533
148
                      .build(&_thread_pool);
534
148
    CHECK(st.ok()) << name << ": " << st;
535
148
}
536
537
70
TaskWorkerPool::~TaskWorkerPool() {
538
70
    stop();
539
70
}
540
541
74
void TaskWorkerPool::stop() {
542
74
    if (_stopped.exchange(true)) {
543
4
        return;
544
4
    }
545
546
70
    if (_thread_pool) {
547
70
        _thread_pool->shutdown();
548
70
    }
549
70
}
550
551
33.3k
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
552
33.3k
    return _submit_task(task, [this](auto&& task) {
553
32.1k
        add_task_count(task, 1);
554
32.1k
        return _thread_pool->submit_func([this, task]() {
555
32.1k
            _callback(task);
556
32.1k
            add_task_count(task, -1);
557
32.1k
        });
558
32.1k
    });
559
33.3k
}
560
561
PriorTaskWorkerPool::PriorTaskWorkerPool(
562
        const std::string& name, int normal_worker_count, int high_prior_worker_count,
563
        std::function<void(const TAgentTaskRequest& task)> callback)
564
13
        : _callback(std::move(callback)) {
565
50
    for (int i = 0; i < normal_worker_count; ++i) {
566
37
        auto st = Thread::create(
567
37
                "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
568
37
        CHECK(st.ok()) << name << ": " << st;
569
37
    }
570
571
50
    for (int i = 0; i < high_prior_worker_count; ++i) {
572
37
        auto st = Thread::create(
573
37
                "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
574
37
        CHECK(st.ok()) << name << ": " << st;
575
37
    }
576
13
}
577
578
7
PriorTaskWorkerPool::~PriorTaskWorkerPool() {
579
7
    stop();
580
7
}
581
582
11
void PriorTaskWorkerPool::stop() {
583
11
    {
584
11
        std::lock_guard lock(_mtx);
585
11
        if (_stopped) {
586
4
            return;
587
4
        }
588
589
7
        _stopped = true;
590
7
    }
591
0
    _normal_condv.notify_all();
592
7
    _high_prior_condv.notify_all();
593
594
38
    for (auto&& w : _workers) {
595
38
        if (w) {
596
38
            w->join();
597
38
        }
598
38
    }
599
7
}
600
601
6
Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
602
6
    return _submit_task(task, [this](auto&& task) {
603
6
        auto req = std::make_unique<TAgentTaskRequest>(task);
604
6
        add_task_count(*req, 1);
605
6
        if (req->__isset.priority && req->priority == TPriority::HIGH) {
606
4
            std::lock_guard lock(_mtx);
607
4
            _high_prior_queue.push_back(std::move(req));
608
4
            _high_prior_condv.notify_one();
609
4
            _normal_condv.notify_one();
610
4
        } else {
611
2
            std::lock_guard lock(_mtx);
612
2
            _normal_queue.push_back(std::move(req));
613
2
            _normal_condv.notify_one();
614
2
        }
615
6
        return Status::OK();
616
6
    });
617
6
}
618
619
0
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) {
620
0
    const TTaskType::type task_type = task.task_type;
621
0
    int64_t signature = task.signature;
622
0
    std::string type_str;
623
0
    EnumToString(TTaskType, task_type, type_str);
624
0
    auto req = std::make_unique<TAgentTaskRequest>(task);
625
626
0
    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
627
0
    do {
628
0
        std::lock_guard lock(s_task_signatures_mtx);
629
0
        auto& set = s_task_signatures[task_type];
630
0
        if (!set.contains(signature)) {
631
            // If it doesn't exist, put it directly into the priority queue
632
0
            add_task_count(*req, 1);
633
0
            set.insert(signature);
634
0
            std::lock_guard temp_lock(_mtx);
635
0
            _high_prior_queue.push_back(std::move(req));
636
0
            _high_prior_condv.notify_one();
637
0
            _normal_condv.notify_one();
638
0
            break;
639
0
        } else {
640
0
            std::lock_guard temp_lock(_mtx);
641
0
            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
642
                // If it exists in the normal queue, cancel the task in the normal queue
643
0
                if ((*it)->signature == signature) {
644
0
                    _normal_queue.erase(it);                     // cancel the original task
645
0
                    _high_prior_queue.push_back(std::move(req)); // add the new task to the queue
646
0
                    _high_prior_condv.notify_one();
647
0
                    _normal_condv.notify_one();
648
0
                    break;
649
0
                } else {
650
0
                    ++it; // doesn't meet the condition, continue to the next one
651
0
                }
652
0
            }
653
            // If it exists in the high priority queue, no operation is needed
654
0
            LOG_INFO("task has already existed in high prior queue.").tag("signature", signature);
655
0
        }
656
0
    } while (false);
657
658
    // Set the receiving time of task so that we can determine whether it is timed out later
659
0
    task.__set_recv_time(time(nullptr));
660
661
0
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
662
0
    return Status::OK();
663
0
}
664
665
37
void PriorTaskWorkerPool::normal_loop() {
666
58
    while (true) {
667
40
        std::unique_ptr<TAgentTaskRequest> req;
668
669
40
        {
670
40
            std::unique_lock lock(_mtx);
671
61
            _normal_condv.wait(lock, [&] {
672
61
                return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped;
673
61
            });
674
675
40
            if (_stopped) {
676
19
                return;
677
19
            }
678
679
21
            if (!_high_prior_queue.empty()) {
680
1
                req = std::move(_high_prior_queue.front());
681
1
                _high_prior_queue.pop_front();
682
20
            } else if (!_normal_queue.empty()) {
683
2
                req = std::move(_normal_queue.front());
684
2
                _normal_queue.pop_front();
685
18
            } else {
686
18
                continue;
687
18
            }
688
21
        }
689
690
3
        _callback(*req);
691
3
        add_task_count(*req, -1);
692
3
    }
693
37
}
694
695
37
void PriorTaskWorkerPool::high_prior_loop() {
696
57
    while (true) {
697
39
        std::unique_ptr<TAgentTaskRequest> req;
698
699
39
        {
700
39
            std::unique_lock lock(_mtx);
701
60
            _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; });
702
703
39
            if (_stopped) {
704
19
                return;
705
19
            }
706
707
20
            if (_high_prior_queue.empty()) {
708
0
                continue;
709
0
            }
710
711
20
            req = std::move(_high_prior_queue.front());
712
20
            _high_prior_queue.pop_front();
713
20
        }
714
715
0
        _callback(*req);
716
20
        add_task_count(*req, -1);
717
20
    }
718
37
}
719
720
ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s,
721
                           std::function<void()> callback)
722
29
        : _name(std::move(name)) {
723
29
    auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] {
724
29
        auto& engine = ExecEnv::GetInstance()->storage_engine();
725
29
        engine.register_report_listener(this);
726
1.41k
        while (true) {
727
1.40k
            {
728
1.40k
                std::unique_lock lock(_mtx);
729
1.40k
                _condv.wait_for(lock, std::chrono::seconds(report_interval_s),
730
2.79k
                                [&] { return _stopped || _signal; });
731
732
1.40k
                if (_stopped) {
733
13
                    break;
734
13
                }
735
736
1.39k
                if (_signal) {
737
                    // Consume received signal
738
60
                    _signal = false;
739
60
                }
740
1.39k
            }
741
742
1.39k
            if (cluster_info->master_fe_addr.port == 0) {
743
                // port == 0 means not received heartbeat yet
744
29
                LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report";
745
29
                continue;
746
29
            }
747
748
1.36k
            callback();
749
1.36k
        }
750
29
        engine.deregister_report_listener(this);
751
29
    };
752
753
29
    auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
754
29
    CHECK(st.ok()) << _name << ": " << st;
755
29
}
756
757
13
ReportWorker::~ReportWorker() {
758
13
    stop();
759
13
}
760
761
61
void ReportWorker::notify() {
762
61
    {
763
61
        std::lock_guard lock(_mtx);
764
61
        _signal = true;
765
61
    }
766
61
    _condv.notify_all();
767
61
}
768
769
14
void ReportWorker::stop() {
770
14
    {
771
14
        std::lock_guard lock(_mtx);
772
14
        if (_stopped) {
773
1
            return;
774
1
        }
775
776
13
        _stopped = true;
777
13
    }
778
0
    _condv.notify_all();
779
13
    if (_thread) {
780
13
        _thread->join();
781
13
    }
782
13
}
783
784
574
void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
785
574
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
786
574
    LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature
787
574
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
788
574
              << ", job_id=" << alter_inverted_index_rq.job_id;
789
790
574
    Status status = Status::OK();
791
574
    auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id);
792
574
    if (tablet_ptr != nullptr) {
793
574
        EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req);
794
574
        status = engine_task.execute();
795
574
    } else {
796
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
797
0
    }
798
799
    // Return result to fe
800
574
    TFinishTaskRequest finish_task_request;
801
574
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
802
574
    finish_task_request.__set_task_type(req.task_type);
803
574
    finish_task_request.__set_signature(req.signature);
804
574
    if (!status.ok()) {
805
9
        LOG(WARNING) << "[index_change]failed to alter inverted index task, signature="
806
9
                     << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
807
9
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
808
565
    } else {
809
565
        LOG(INFO) << "[index_change]successfully alter inverted index task, signature="
810
565
                  << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
811
565
                  << ", job_id=" << alter_inverted_index_rq.job_id;
812
565
    }
813
574
    finish_task_request.__set_task_status(status.to_thrift());
814
574
    finish_task(finish_task_request);
815
574
    remove_task_info(req.task_type, req.signature);
816
574
}
817
818
0
void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
819
0
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
820
0
    LOG(INFO) << "get alter inverted index task. signature=" << req.signature
821
0
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
822
0
              << ", job_id=" << alter_inverted_index_rq.job_id;
823
824
0
    Status status = Status::OK();
825
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
826
0
    if (tablet_ptr != nullptr) {
827
0
        EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
828
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
829
0
        status = engine_task.execute();
830
0
    } else {
831
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
832
0
    }
833
834
    // Return result to fe
835
0
    TFinishTaskRequest finish_task_request;
836
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
837
0
    finish_task_request.__set_task_type(req.task_type);
838
0
    finish_task_request.__set_signature(req.signature);
839
0
    std::vector<TTabletInfo> finish_tablet_infos;
840
0
    if (!status.ok()) {
841
0
        LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature
842
0
                     << ", tablet_id=" << alter_inverted_index_rq.tablet_id
843
0
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
844
0
    } else {
845
0
        LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature
846
0
                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
847
0
                  << ", job_id=" << alter_inverted_index_rq.job_id;
848
0
        TTabletInfo tablet_info;
849
0
        status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id,
850
0
                                 alter_inverted_index_rq.schema_hash, &tablet_info);
851
0
        if (status.ok()) {
852
0
            finish_tablet_infos.push_back(tablet_info);
853
0
        }
854
0
        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
855
0
    }
856
0
    finish_task_request.__set_task_status(status.to_thrift());
857
0
    finish_task(finish_task_request);
858
0
    remove_task_info(req.task_type, req.signature);
859
0
}
860
861
0
void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
862
0
    LOG(INFO) << "get update tablet meta task. signature=" << req.signature;
863
864
0
    Status status;
865
0
    const auto& update_tablet_meta_req = req.update_tablet_meta_info_req;
866
0
    for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
867
0
        auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id);
868
0
        if (tablet == nullptr) {
869
0
            status = Status::NotFound("tablet not found");
870
0
            LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id="
871
0
                         << tablet_meta_info.tablet_id;
872
0
            continue;
873
0
        }
874
0
        bool need_to_save = false;
875
0
        if (tablet_meta_info.__isset.partition_id) {
876
            // for fix partition_id = 0
877
0
            LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id()
878
0
                         << "partition id from : " << tablet->tablet_meta()->partition_id()
879
0
                         << " to : " << tablet_meta_info.partition_id;
880
0
            auto succ = engine.tablet_manager()->update_tablet_partition_id(
881
0
                    tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id());
882
0
            if (!succ) {
883
0
                std::string err_msg = fmt::format(
884
0
                        "change be tablet id : {} partition_id : {} failed",
885
0
                        tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id);
886
0
                LOG(WARNING) << err_msg;
887
0
                status = Status::InvalidArgument(err_msg);
888
0
                continue;
889
0
            }
890
0
            need_to_save = true;
891
0
        }
892
0
        if (tablet_meta_info.__isset.storage_policy_id) {
893
0
            tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
894
0
            need_to_save = true;
895
0
        }
896
0
        if (tablet_meta_info.__isset.is_in_memory) {
897
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
898
0
                    tablet_meta_info.is_in_memory);
899
0
            std::shared_lock rlock(tablet->get_header_lock());
900
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
901
0
                rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
902
0
            }
903
0
            tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
904
0
            need_to_save = true;
905
0
        }
906
0
        if (tablet_meta_info.__isset.compaction_policy) {
907
0
            if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY &&
908
0
                tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) {
909
0
                status = Status::InvalidArgument(
910
0
                        "invalid compaction policy, only support for size_based or "
911
0
                        "time_series");
912
0
                continue;
913
0
            }
914
0
            tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
915
0
            need_to_save = true;
916
0
        }
917
0
        if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
918
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
919
0
                status = Status::InvalidArgument(
920
0
                        "only time series compaction policy support time series config");
921
0
                continue;
922
0
            }
923
0
            tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
924
0
                    tablet_meta_info.time_series_compaction_goal_size_mbytes);
925
0
            need_to_save = true;
926
0
        }
927
0
        if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
928
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
929
0
                status = Status::InvalidArgument(
930
0
                        "only time series compaction policy support time series config");
931
0
                continue;
932
0
            }
933
0
            tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
934
0
                    tablet_meta_info.time_series_compaction_file_count_threshold);
935
0
            need_to_save = true;
936
0
        }
937
0
        if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
938
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
939
0
                status = Status::InvalidArgument(
940
0
                        "only time series compaction policy support time series config");
941
0
                continue;
942
0
            }
943
0
            tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
944
0
                    tablet_meta_info.time_series_compaction_time_threshold_seconds);
945
0
            need_to_save = true;
946
0
        }
947
0
        if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
948
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
949
0
                status = Status::InvalidArgument(
950
0
                        "only time series compaction policy support time series config");
951
0
                continue;
952
0
            }
953
0
            tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
954
0
                    tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
955
0
            need_to_save = true;
956
0
        }
957
0
        if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
958
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
959
0
                status = Status::InvalidArgument(
960
0
                        "only time series compaction policy support time series config");
961
0
                continue;
962
0
            }
963
0
            tablet->tablet_meta()->set_time_series_compaction_level_threshold(
964
0
                    tablet_meta_info.time_series_compaction_level_threshold);
965
0
            need_to_save = true;
966
0
        }
967
0
        if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) {
968
0
            tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group(
969
0
                    tablet_meta_info.vertical_compaction_num_columns_per_group);
970
0
            need_to_save = true;
971
        }
972
0
        if (tablet_meta_info.__isset.replica_id) {
973
0
            tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
974
0
        }
975
0
        if (tablet_meta_info.__isset.binlog_config) {
976
0
            // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums
977
0
            const auto& t_binlog_config = tablet_meta_info.binlog_config;
978
0
            if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds ||
979
0
                !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) {
980
0
                status = Status::InvalidArgument("invalid binlog config, some fields not set");
981
0
                LOG(WARNING) << fmt::format(
982
0
                        "invalid binlog config, some fields not set, tablet_id={}, "
983
                        "t_binlog_config={}",
984
0
                        tablet_meta_info.tablet_id,
985
0
                        apache::thrift::ThriftDebugString(t_binlog_config));
986
0
                continue;
987
0
            }
988
0
989
0
            BinlogConfig new_binlog_config;
990
0
            new_binlog_config = tablet_meta_info.binlog_config;
991
0
            LOG(INFO) << fmt::format(
992
0
                    "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, "
993
0
                    "new_binlog_config={}",
994
0
                    tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(),
995
0
                    new_binlog_config.to_string());
996
0
            tablet->set_binlog_config(new_binlog_config);
997
0
            need_to_save = true;
998
0
        }
999
0
        if (tablet_meta_info.__isset.enable_single_replica_compaction) {
1000
0
            std::shared_lock rlock(tablet->get_header_lock());
1001
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
1002
0
                    tablet_meta_info.enable_single_replica_compaction);
1003
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1004
0
                rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
1005
0
                        tablet_meta_info.enable_single_replica_compaction);
1006
0
            }
1007
0
            tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
1008
0
                    tablet_meta_info.enable_single_replica_compaction);
1009
0
            need_to_save = true;
1010
0
        }
1011
0
        if (tablet_meta_info.__isset.disable_auto_compaction) {
1012
0
            std::shared_lock rlock(tablet->get_header_lock());
1013
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
1014
0
                    tablet_meta_info.disable_auto_compaction);
1015
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1016
0
                rowset_meta->tablet_schema()->set_disable_auto_compaction(
1017
0
                        tablet_meta_info.disable_auto_compaction);
1018
            }
1019
0
            tablet->tablet_schema_unlocked()->set_disable_auto_compaction(
1020
0
                    tablet_meta_info.disable_auto_compaction);
1021
0
            need_to_save = true;
1022
0
        }
1023
0
1024
0
        if (tablet_meta_info.__isset.skip_write_index_on_load) {
1025
0
            std::shared_lock rlock(tablet->get_header_lock());
1026
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
1027
0
                    tablet_meta_info.skip_write_index_on_load);
1028
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1029
0
                rowset_meta->tablet_schema()->set_skip_write_index_on_load(
1030
0
                        tablet_meta_info.skip_write_index_on_load);
1031
0
            }
1032
0
            tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
1033
0
                    tablet_meta_info.skip_write_index_on_load);
1034
0
            need_to_save = true;
1035
0
        }
1036
        if (need_to_save) {
1037
0
            std::shared_lock rlock(tablet->get_header_lock());
1038
0
            tablet->save_meta();
1039
0
        }
1040
0
    }
1041
0
1042
0
    LOG(INFO) << "finish update tablet meta task. signature=" << req.signature;
1043
0
    if (req.signature != -1) {
1044
0
        TFinishTaskRequest finish_task_request;
1045
0
        finish_task_request.__set_task_status(status.to_thrift());
1046
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1047
0
        finish_task_request.__set_task_type(req.task_type);
1048
        finish_task_request.__set_signature(req.signature);
1049
0
        finish_task(finish_task_request);
1050
0
        remove_task_info(req.task_type, req.signature);
1051
0
    }
1052
0
}
1053
0
1054
0
void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1055
0
    uint32_t checksum = 0;
1056
0
    const auto& check_consistency_req = req.check_consistency_req;
1057
0
    EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
1058
0
                                   check_consistency_req.schema_hash, check_consistency_req.version,
1059
0
                                   &checksum);
1060
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1061
0
    Status status = engine_task.execute();
1062
0
    if (!status.ok()) {
1063
0
        LOG_WARNING("failed to check consistency")
1064
0
                .tag("signature", req.signature)
1065
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1066
0
                .error(status);
1067
0
    } else {
1068
        LOG_INFO("successfully check consistency")
1069
0
                .tag("signature", req.signature)
1070
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1071
0
                .tag("checksum", checksum);
1072
0
    }
1073
0
1074
0
    TFinishTaskRequest finish_task_request;
1075
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1076
    finish_task_request.__set_task_type(req.task_type);
1077
0
    finish_task_request.__set_signature(req.signature);
1078
0
    finish_task_request.__set_task_status(status.to_thrift());
1079
0
    finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
1080
    finish_task_request.__set_request_version(check_consistency_req.version);
1081
462
1082
462
    finish_task(finish_task_request);
1083
462
    remove_task_info(req.task_type, req.signature);
1084
462
}
1085
462
1086
462
void report_task_callback(const ClusterInfo* cluster_info) {
1087
462
    TReportRequest request;
1088
462
    if (config::report_random_wait) {
1089
462
        random_sleep(5);
1090
1.45k
    }
1091
1.45k
    request.__isset.tasks = true;
1092
3.11k
    {
1093
3.11k
        std::lock_guard lock(s_task_signatures_mtx);
1094
3.11k
        auto& tasks = request.tasks;
1095
1.45k
        for (auto&& [task_type, signatures] : s_task_signatures) {
1096
462
            auto& set = tasks[task_type];
1097
462
            for (auto&& signature : signatures) {
1098
462
                set.insert(signature);
1099
462
            }
1100
462
        }
1101
462
    }
1102
0
    request.__set_backend(BackendOptions::get_local_backend());
1103
0
    request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
1104
462
    bool succ = handle_report(request, cluster_info, "task");
1105
    report_task_total << 1;
1106
57
    if (!succ) [[unlikely]] {
1107
57
        report_task_failed << 1;
1108
57
    }
1109
57
}
1110
1111
57
void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1112
57
    TReportRequest request;
1113
    request.__set_backend(BackendOptions::get_local_backend());
1114
91
    request.__isset.disks = true;
1115
91
1116
91
    std::vector<DataDirInfo> data_dir_infos;
1117
91
    static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */));
1118
91
1119
91
    for (auto& root_path_info : data_dir_infos) {
1120
91
        TDisk disk;
1121
91
        disk.__set_root_path(root_path_info.path);
1122
91
        disk.__set_path_hash(root_path_info.path_hash);
1123
91
        disk.__set_storage_medium(root_path_info.storage_medium);
1124
91
        disk.__set_disk_total_capacity(root_path_info.disk_capacity);
1125
91
        disk.__set_data_used_capacity(root_path_info.local_used_capacity);
1126
91
        disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
1127
57
        disk.__set_disk_available_capacity(root_path_info.available);
1128
57
        disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
1129
57
        disk.__set_used(root_path_info.is_used);
1130
57
        request.disks[root_path_info.path] = disk;
1131
57
    }
1132
57
    request.__set_num_cores(CpuInfo::num_cores());
1133
57
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1134
0
                                                 ? config::pipeline_executor_size
1135
0
                                                 : CpuInfo::num_cores());
1136
57
    bool succ = handle_report(request, cluster_info, "disk");
1137
    report_disk_total << 1;
1138
133
    if (!succ) [[unlikely]] {
1139
        report_disk_failed << 1;
1140
    }
1141
}
1142
133
1143
133
void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1144
133
    // Random sleep 1~5 seconds before doing report.
1145
133
    // In order to avoid the problem that the FE receives many report requests at the same time
1146
    // and can not be processed.
1147
133
    if (config::report_random_wait) {
1148
133
        random_sleep(5);
1149
133
    }
1150
    (void)engine; // To be used in the future
1151
1152
    TReportRequest request;
1153
    request.__set_backend(BackendOptions::get_local_backend());
1154
133
    request.__isset.disks = true;
1155
133
1156
133
    // TODO(deardeng): report disk info in cloud mode. And make it more clear
1157
133
    //                 that report CPU by using a separte report procedure
1158
133
    //                 or abstracting disk report as "host info report"
1159
133
    request.__set_num_cores(CpuInfo::num_cores());
1160
133
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1161
133
                                                 ? config::pipeline_executor_size
1162
                                                 : CpuInfo::num_cores());
1163
27
    bool succ = handle_report(request, cluster_info, "disk");
1164
27
    report_disk_total << 1;
1165
27
    report_disk_failed << !succ;
1166
27
}
1167
1168
27
void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1169
27
    if (config::report_random_wait) {
1170
27
        random_sleep(5);
1171
    }
1172
27
1173
27
    TReportRequest request;
1174
27
    request.__set_backend(BackendOptions::get_local_backend());
1175
27
    request.__isset.tablets = true;
1176
27
1177
27
    increase_report_version();
1178
27
    uint64_t report_version;
1179
27
    for (int i = 0; i < 5; i++) {
1180
27
        request.tablets.clear();
1181
27
        report_version = s_report_version;
1182
        engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
1183
27
        if (report_version == s_report_version) {
1184
            break;
1185
        }
1186
    }
1187
1188
0
    if (report_version < s_report_version) {
1189
0
        // TODO llj This can only reduce the possibility for report error, but can't avoid it.
1190
0
        // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
1191
0
        // report, and the report version has a small probability that it has not been updated in time. When FE
1192
        // receives this report, it is possible to delete the new tablet.
1193
27
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1194
27
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1195
27
        return;
1196
    }
1197
27
1198
27
    std::map<int64_t, int64_t> partitions_version;
1199
27
    engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
1200
27
    request.__set_partitions_version(std::move(partitions_version));
1201
27
1202
    int64_t max_compaction_score =
1203
            std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
1204
27
                     DorisMetrics::instance()->tablet_base_max_compaction_score->value());
1205
60
    request.__set_tablet_max_compaction_score(max_compaction_score);
1206
60
    request.__set_report_version(report_version);
1207
60
1208
60
    // report storage policy and resource
1209
60
    auto& storage_policy_list = request.storage_policy;
1210
27
    for (auto [id, version] : get_storage_policy_ids()) {
1211
27
        auto& storage_policy = storage_policy_list.emplace_back();
1212
123
        storage_policy.__set_id(id);
1213
123
        storage_policy.__set_version(version);
1214
123
    }
1215
123
    request.__isset.storage_policy = true;
1216
123
    auto& resource_list = request.resource;
1217
0
    for (auto [id_str, version] : get_storage_resource_ids()) {
1218
123
        auto& resource = resource_list.emplace_back();
1219
123
        int64_t id = -1;
1220
123
        if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
1221
123
            ec != std::errc {}) [[unlikely]] {
1222
123
            LOG(ERROR) << "invalid resource id format: " << id_str;
1223
27
        } else {
1224
            resource.__set_id(id);
1225
27
            resource.__set_version(version);
1226
27
        }
1227
27
    }
1228
0
    request.__isset.resource = true;
1229
0
1230
27
    bool succ = handle_report(request, cluster_info, "tablet");
1231
    report_tablet_total << 1;
1232
69
    if (!succ) [[unlikely]] {
1233
        report_tablet_failed << 1;
1234
    }
1235
}
1236
69
1237
69
void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1238
69
    // Random sleep 1~5 seconds before doing report.
1239
    // In order to avoid the problem that the FE receives many report requests at the same time
1240
69
    // and can not be processed.
1241
69
    if (config::report_random_wait) {
1242
69
        random_sleep(5);
1243
    }
1244
69
1245
69
    TReportRequest request;
1246
69
    request.__set_backend(BackendOptions::get_local_backend());
1247
81
    request.__isset.tablets = true;
1248
80
1249
80
    increase_report_version();
1250
80
    uint64_t report_version;
1251
80
    uint64_t total_num_tablets = 0;
1252
68
    for (int i = 0; i < 5; i++) {
1253
68
        request.tablets.clear();
1254
80
        report_version = s_report_version;
1255
        engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
1256
69
        if (report_version == s_report_version) {
1257
1
            break;
1258
1
        }
1259
1
    }
1260
1
1261
    if (report_version < s_report_version) {
1262
68
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1263
68
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1264
        return;
1265
68
    }
1266
68
1267
68
    request.__set_report_version(report_version);
1268
0
    request.__set_num_tablets(total_num_tablets);
1269
0
1270
68
    bool succ = handle_report(request, cluster_info, "tablet");
1271
    report_tablet_total << 1;
1272
0
    if (!succ) [[unlikely]] {
1273
0
        report_tablet_failed << 1;
1274
    }
1275
0
}
1276
0
1277
void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1278
0
    const auto& upload_request = req.upload_req;
1279
0
1280
0
    LOG(INFO) << "get upload task. signature=" << req.signature
1281
0
              << ", job_id=" << upload_request.job_id;
1282
0
1283
0
    std::map<int64_t, std::vector<std::string>> tablet_files;
1284
0
    std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1285
0
            engine, env, upload_request.job_id, req.signature, upload_request.broker_addr,
1286
0
            upload_request.broker_prop);
1287
0
    SCOPED_ATTACH_TASK(loader->resource_ctx());
1288
0
    Status status =
1289
0
            loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend
1290
                                                                : TStorageBackendType::type::BROKER,
1291
0
                         upload_request.__isset.location ? upload_request.location : "");
1292
0
    if (status.ok()) {
1293
0
        status = loader->upload(upload_request.src_dest_map, &tablet_files);
1294
0
    }
1295
0
1296
0
    if (!status.ok()) {
1297
0
        LOG_WARNING("failed to upload")
1298
0
                .tag("signature", req.signature)
1299
0
                .tag("job_id", upload_request.job_id)
1300
0
                .error(status);
1301
    } else {
1302
0
        LOG_INFO("successfully upload")
1303
0
                .tag("signature", req.signature)
1304
0
                .tag("job_id", upload_request.job_id);
1305
0
    }
1306
0
1307
0
    TFinishTaskRequest finish_task_request;
1308
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1309
0
    finish_task_request.__set_task_type(req.task_type);
1310
0
    finish_task_request.__set_signature(req.signature);
1311
0
    finish_task_request.__set_task_status(status.to_thrift());
1312
    finish_task_request.__set_tablet_files(tablet_files);
1313
0
1314
0
    finish_task(finish_task_request);
1315
0
    remove_task_info(req.task_type, req.signature);
1316
0
}
1317
0
1318
void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1319
    const auto& download_request = req.download_req;
1320
0
    LOG(INFO) << "get download task. signature=" << req.signature
1321
              << ", job_id=" << download_request.job_id
1322
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1323
0
1324
0
    // TODO: download
1325
0
    std::vector<int64_t> downloaded_tablet_ids;
1326
0
1327
0
    auto status = Status::OK();
1328
0
    if (download_request.__isset.remote_tablet_snapshots) {
1329
0
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1330
0
                engine, env, download_request.job_id, req.signature);
1331
0
        status = loader->remote_http_download(download_request.remote_tablet_snapshots,
1332
0
                                              &downloaded_tablet_ids);
1333
0
    } else {
1334
0
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1335
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1336
0
                download_request.broker_prop);
1337
0
        status = loader->init(download_request.__isset.storage_backend
1338
0
                                      ? download_request.storage_backend
1339
0
                                      : TStorageBackendType::type::BROKER,
1340
                              download_request.__isset.location ? download_request.location : "");
1341
0
        if (status.ok()) {
1342
0
            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
1343
0
        }
1344
0
    }
1345
0
1346
0
    if (!status.ok()) {
1347
0
        LOG_WARNING("failed to download")
1348
0
                .tag("signature", req.signature)
1349
0
                .tag("job_id", download_request.job_id)
1350
0
                .error(status);
1351
    } else {
1352
0
        LOG_INFO("successfully download")
1353
0
                .tag("signature", req.signature)
1354
0
                .tag("job_id", download_request.job_id);
1355
0
    }
1356
0
1357
0
    TFinishTaskRequest finish_task_request;
1358
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1359
0
    finish_task_request.__set_task_type(req.task_type);
1360
0
    finish_task_request.__set_signature(req.signature);
1361
0
    finish_task_request.__set_task_status(status.to_thrift());
1362
    finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
1363
0
1364
0
    finish_task(finish_task_request);
1365
0
    remove_task_info(req.task_type, req.signature);
1366
0
}
1367
0
1368
void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1369
0
    const auto& download_request = req.download_req;
1370
    LOG(INFO) << "get download task. signature=" << req.signature
1371
0
              << ", job_id=" << download_request.job_id
1372
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1373
0
1374
0
    std::vector<int64_t> transferred_tablet_ids;
1375
0
1376
0
    auto status = Status::OK();
1377
0
    if (download_request.__isset.remote_tablet_snapshots) {
1378
0
        status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
1379
0
                "remote tablet snapshot is not supported.");
1380
0
    } else {
1381
0
        std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>(
1382
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1383
0
                download_request.broker_prop);
1384
0
        status = loader->init(download_request.__isset.storage_backend
1385
0
                                      ? download_request.storage_backend
1386
0
                                      : TStorageBackendType::type::BROKER,
1387
                              download_request.__isset.location ? download_request.location : "",
1388
0
                              download_request.vault_id);
1389
0
        if (status.ok()) {
1390
0
            status = loader->download(download_request.src_dest_map, &transferred_tablet_ids);
1391
0
        }
1392
0
1393
0
        if (!status.ok()) {
1394
0
            LOG_WARNING("failed to download")
1395
0
                    .tag("signature", req.signature)
1396
0
                    .tag("job_id", download_request.job_id)
1397
0
                    .error(status);
1398
        } else {
1399
0
            LOG_INFO("successfully download")
1400
0
                    .tag("signature", req.signature)
1401
0
                    .tag("job_id", download_request.job_id);
1402
0
        }
1403
0
1404
0
        TFinishTaskRequest finish_task_request;
1405
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1406
0
        finish_task_request.__set_task_type(req.task_type);
1407
0
        finish_task_request.__set_signature(req.signature);
1408
0
        finish_task_request.__set_task_status(status.to_thrift());
1409
0
        finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids);
1410
1411
0
        finish_task(finish_task_request);
1412
0
        remove_task_info(req.task_type, req.signature);
1413
    }
1414
0
}
1415
1416
0
void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1417
0
    const auto& snapshot_request = req.snapshot_req;
1418
0
1419
0
    LOG(INFO) << "get snapshot task. signature=" << req.signature;
1420
0
1421
0
    std::string snapshot_path;
1422
    bool allow_incremental_clone = false; // not used
1423
    std::vector<std::string> snapshot_files;
1424
    Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
1425
0
                                                         &allow_incremental_clone);
1426
0
    if (status.ok() && snapshot_request.__isset.list_files) {
1427
0
        // list and save all snapshot files
1428
0
        // snapshot_path like: data/snapshot/20180417205230.1.86400
1429
0
        // we need to add subdir: tablet_id/schema_hash/
1430
0
        std::vector<io::FileInfo> files;
1431
0
        bool exists = true;
1432
0
        io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id,
1433
0
                                    snapshot_request.schema_hash);
1434
0
        status = io::global_local_filesystem()->list(path, true, &files, &exists);
1435
0
        if (status.ok()) {
1436
0
            for (auto& file : files) {
1437
0
                snapshot_files.push_back(file.file_name);
1438
0
            }
1439
0
        }
1440
0
    }
1441
0
    if (!status.ok()) {
1442
0
        LOG_WARNING("failed to make snapshot")
1443
0
                .tag("signature", req.signature)
1444
0
                .tag("tablet_id", snapshot_request.tablet_id)
1445
0
                .tag("version", snapshot_request.version)
1446
0
                .error(status);
1447
0
    } else {
1448
0
        LOG_INFO("successfully make snapshot")
1449
                .tag("signature", req.signature)
1450
0
                .tag("tablet_id", snapshot_request.tablet_id)
1451
0
                .tag("version", snapshot_request.version)
1452
0
                .tag("snapshot_path", snapshot_path);
1453
0
    }
1454
0
1455
0
    TFinishTaskRequest finish_task_request;
1456
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1457
    finish_task_request.__set_task_type(req.task_type);
1458
0
    finish_task_request.__set_signature(req.signature);
1459
0
    finish_task_request.__set_snapshot_path(snapshot_path);
1460
0
    finish_task_request.__set_snapshot_files(snapshot_files);
1461
    finish_task_request.__set_task_status(status.to_thrift());
1462
0
1463
0
    finish_task(finish_task_request);
1464
    remove_task_info(req.task_type, req.signature);
1465
0
}
1466
1467
0
void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1468
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1469
0
1470
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1471
0
1472
0
    const std::string& snapshot_path = release_snapshot_request.snapshot_path;
1473
0
    Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
1474
0
    if (!status.ok()) {
1475
0
        LOG_WARNING("failed to release snapshot")
1476
0
                .tag("signature", req.signature)
1477
0
                .tag("snapshot_path", snapshot_path)
1478
0
                .error(status);
1479
    } else {
1480
0
        LOG_INFO("successfully release snapshot")
1481
0
                .tag("signature", req.signature)
1482
0
                .tag("snapshot_path", snapshot_path);
1483
0
    }
1484
0
1485
    TFinishTaskRequest finish_task_request;
1486
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1487
0
    finish_task_request.__set_task_type(req.task_type);
1488
0
    finish_task_request.__set_signature(req.signature);
1489
    finish_task_request.__set_task_status(status.to_thrift());
1490
0
1491
0
    finish_task(finish_task_request);
1492
    remove_task_info(req.task_type, req.signature);
1493
0
}
1494
1495
0
void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1496
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1497
1498
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1499
0
1500
0
    Status status = engine.cloud_snapshot_mgr().release_snapshot(
1501
0
            release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed);
1502
0
1503
0
    if (!status.ok()) {
1504
0
        LOG_WARNING("failed to release snapshot")
1505
0
                .tag("signature", req.signature)
1506
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1507
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed)
1508
0
                .error(status);
1509
0
    } else {
1510
        LOG_INFO("successfully release snapshot")
1511
0
                .tag("signature", req.signature)
1512
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1513
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed);
1514
0
    }
1515
0
1516
    TFinishTaskRequest finish_task_request;
1517
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1518
0
    finish_task_request.__set_task_type(req.task_type);
1519
0
    finish_task_request.__set_signature(req.signature);
1520
    finish_task_request.__set_task_status(status.to_thrift());
1521
0
1522
0
    finish_task(finish_task_request);
1523
    remove_task_info(req.task_type, req.signature);
1524
0
}
1525
0
1526
0
void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1527
0
    const auto& move_dir_req = req.move_dir_req;
1528
0
1529
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1530
0
              << ", job_id=" << move_dir_req.job_id;
1531
0
    Status status;
1532
0
    auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id);
1533
0
    if (tablet == nullptr) {
1534
        status = Status::InvalidArgument("Could not find tablet");
1535
0
    } else {
1536
0
        SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id);
1537
0
        status = loader.move(move_dir_req.src, tablet, true);
1538
0
    }
1539
0
1540
0
    if (!status.ok()) {
1541
0
        LOG_WARNING("failed to move dir")
1542
0
                .tag("signature", req.signature)
1543
0
                .tag("job_id", move_dir_req.job_id)
1544
0
                .tag("tablet_id", move_dir_req.tablet_id)
1545
0
                .tag("src", move_dir_req.src)
1546
0
                .error(status);
1547
0
    } else {
1548
0
        LOG_INFO("successfully move dir")
1549
                .tag("signature", req.signature)
1550
0
                .tag("job_id", move_dir_req.job_id)
1551
0
                .tag("tablet_id", move_dir_req.tablet_id)
1552
0
                .tag("src", move_dir_req.src);
1553
0
    }
1554
0
1555
    TFinishTaskRequest finish_task_request;
1556
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1557
0
    finish_task_request.__set_task_type(req.task_type);
1558
0
    finish_task_request.__set_signature(req.signature);
1559
    finish_task_request.__set_task_status(status.to_thrift());
1560
0
1561
0
    finish_task(finish_task_request);
1562
    remove_task_info(req.task_type, req.signature);
1563
0
}
1564
0
1565
void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1566
0
    const auto& move_dir_req = req.move_dir_req;
1567
0
1568
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1569
0
              << ", job_id=" << move_dir_req.job_id;
1570
0
1571
0
    Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id);
1572
0
    if (!status.ok()) {
1573
0
        LOG_WARNING("failed to move dir")
1574
0
                .tag("signature", req.signature)
1575
0
                .tag("job_id", move_dir_req.job_id)
1576
0
                .tag("tablet_id", move_dir_req.tablet_id)
1577
0
                .error(status);
1578
0
    } else {
1579
        LOG_INFO("successfully move dir")
1580
0
                .tag("signature", req.signature)
1581
0
                .tag("job_id", move_dir_req.job_id)
1582
0
                .tag("tablet_id", move_dir_req.tablet_id);
1583
0
    }
1584
0
1585
    TFinishTaskRequest finish_task_request;
1586
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1587
0
    finish_task_request.__set_task_type(req.task_type);
1588
0
    finish_task_request.__set_signature(req.signature);
1589
    finish_task_request.__set_task_status(status.to_thrift());
1590
0
1591
0
    finish_task(finish_task_request);
1592
    remove_task_info(req.task_type, req.signature);
1593
0
}
1594
0
1595
void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1596
0
    const auto& compaction_req = req.compaction_req;
1597
0
1598
0
    LOG(INFO) << "get compaction task. signature=" << req.signature
1599
0
              << ", compaction_type=" << compaction_req.type;
1600
0
1601
0
    CompactionType compaction_type;
1602
    if (compaction_req.type == "base") {
1603
0
        compaction_type = CompactionType::BASE_COMPACTION;
1604
0
    } else {
1605
0
        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
1606
0
    }
1607
0
1608
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id);
1609
0
    if (tablet_ptr != nullptr) {
1610
0
        auto* data_dir = tablet_ptr->data_dir();
1611
        if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
1612
0
            LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id()
1613
0
                         << ", compaction_type=" << compaction_type;
1614
0
            return;
1615
0
        }
1616
0
1617
0
        Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false);
1618
        if (!status.ok()) {
1619
            LOG(WARNING) << "failed to submit table compaction task. error=" << status;
1620
        }
1621
33
    }
1622
33
}
1623
33
1624
namespace {
1625
33
1626
void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1627
33
    Status st;
1628
33
    io::RemoteFileSystemSPtr fs;
1629
33
1630
7
    if (!existed_fs) {
1631
26
        // No such FS instance on BE
1632
26
        auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
1633
26
                                            std::to_string(param.id));
1634
33
        if (!res.has_value()) {
1635
0
            st = std::move(res).error();
1636
0
        } else {
1637
0
            fs = std::move(res).value();
1638
0
        }
1639
0
    } else {
1640
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
1641
0
        auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
1642
        auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
1643
33
        S3ClientConf conf = std::move(new_s3_conf.client_conf);
1644
7
        st = client->reset(conf);
1645
26
        fs = std::move(existed_fs);
1646
26
    }
1647
26
1648
26
    if (!st.ok()) {
1649
26
        LOG(WARNING) << "update s3 resource failed: " << st;
1650
26
    } else {
1651
33
        LOG_INFO("successfully update s3 resource")
1652
                .tag("resource_id", param.id)
1653
36
                .tag("resource_name", param.name);
1654
36
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1655
36
    }
1656
36
}
1657
36
1658
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1659
36
    Status st;
1660
    io::RemoteFileSystemSPtr fs;
1661
36
    std::string root_path =
1662
36
            param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";
1663
36
1664
36
    if (!existed_fs) {
1665
14
        // No such FS instance on BE
1666
22
        auto res = io::HdfsFileSystem::create(
1667
22
                param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
1668
22
                std::to_string(param.id), nullptr, std::move(root_path));
1669
        if (!res.has_value()) {
1670
36
            st = std::move(res).error();
1671
0
        } else {
1672
            fs = std::move(res).value();
1673
0
        }
1674
0
1675
    } else {
1676
36
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
1677
14
        // TODO(plat1ko): update hdfs conf
1678
22
        fs = std::move(existed_fs);
1679
22
    }
1680
22
1681
22
    if (!st.ok()) {
1682
22
        LOG(WARNING) << "update hdfs resource failed: " << st;
1683
22
    } else {
1684
22
        LOG_INFO("successfully update hdfs resource")
1685
36
                .tag("resource_id", param.id)
1686
                .tag("resource_name", param.name)
1687
                .tag("root_path", fs->root_path().string());
1688
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1689
9
    }
1690
9
}
1691
1692
69
} // namespace
1693
69
1694
69
void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1695
0
    const auto& push_storage_policy_req = req.push_storage_policy_req;
1696
    // refresh resource
1697
0
    for (auto&& param : push_storage_policy_req.resource) {
1698
0
        io::RemoteFileSystemSPtr fs;
1699
        if (auto existed_resource = get_storage_resource(param.id); existed_resource) {
1700
0
            if (existed_resource->second >= param.version) {
1701
0
                // Stale request, ignore
1702
                continue;
1703
69
            }
1704
33
1705
36
            fs = std::move(existed_resource->first.fs);
1706
36
        }
1707
36
1708
0
        if (param.__isset.s3_storage_param) {
1709
0
            update_s3_resource(param, std::move(fs));
1710
69
        } else if (param.__isset.hdfs_storage_param) {
1711
            update_hdfs_resource(param, std::move(fs));
1712
9
        } else {
1713
0
            LOG(WARNING) << "unknown resource=" << param;
1714
0
        }
1715
    }
1716
24
    // drop storage policy
1717
24
    for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
1718
24
        delete_storage_policy(policy_id);
1719
24
    }
1720
24
    // refresh storage policy
1721
24
    for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
1722
24
        auto existed_storage_policy = get_storage_policy(storage_policy.id);
1723
24
        if (existed_storage_policy == nullptr ||
1724
24
            existed_storage_policy->version < storage_policy.version) {
1725
24
            auto storage_policy1 = std::make_shared<StoragePolicy>();
1726
24
            storage_policy1->name = storage_policy.name;
1727
24
            storage_policy1->version = storage_policy.version;
1728
24
            storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
1729
24
            storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
1730
24
            storage_policy1->resource_id = storage_policy.resource_id;
1731
24
            LOG_INFO("successfully update storage policy")
1732
9
                    .tag("storage_policy_id", storage_policy.id)
1733
                    .tag("storage_policy", storage_policy1->to_string());
1734
16
            put_storage_policy(storage_policy.id, std::move(storage_policy1));
1735
16
        }
1736
16
    }
1737
16
}
1738
16
1739
void push_index_policy_callback(const TAgentTaskRequest& req) {
1740
2
    const auto& request = req.push_index_policy_req;
1741
2
    doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes(
1742
326
            request.index_policys, request.dropped_index_policys);
1743
326
}
1744
326
1745
326
void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1746
0
    const auto& push_cooldown_conf_req = req.push_cooldown_conf;
1747
0
    for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
1748
0
        int64_t tablet_id = cooldown_conf.tablet_id;
1749
326
        TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
1750
326
        if (tablet == nullptr) {
1751
326
            LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
1752
326
            continue;
1753
2
        }
1754
2
        if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
1755
326
                                         cooldown_conf.cooldown_replica_id) &&
1756
2
            cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
1757
            tablet->tablet_meta()->cooldown_meta_id().initialized()) {
1758
0
            Tablet::async_write_cooldown_meta(tablet);
1759
0
        }
1760
0
    }
1761
0
}
1762
0
1763
0
void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1764
0
    const auto& create_tablet_req = req.create_tablet_req;
1765
0
    RuntimeProfile runtime_profile("CreateTablet");
1766
0
    RuntimeProfile* profile = &runtime_profile;
1767
0
    MonotonicStopWatch watch;
1768
0
    watch.start();
1769
0
    Defer defer = [&] {
1770
0
        auto elapsed_time = static_cast<double>(watch.elapsed_time());
1771
0
        if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
1772
0
            COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
1773
0
            std::stringstream ss;
1774
0
            profile->pretty_print(&ss);
1775
            LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str();
1776
0
        }
1777
0
    };
1778
0
    DorisMetrics::instance()->create_tablet_requests_total->increment(1);
1779
0
    VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
1780
0
1781
0
    std::vector<TTabletInfo> finish_tablet_infos;
1782
0
    VLOG_NOTICE << "create tablet: " << create_tablet_req;
1783
0
    Status status = engine.create_tablet(create_tablet_req, profile);
1784
0
    if (!status.ok()) {
1785
0
        DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
1786
0
        LOG_WARNING("failed to create tablet, reason={}", status.to_string())
1787
                .tag("signature", req.signature)
1788
0
                .tag("tablet_id", create_tablet_req.tablet_id)
1789
0
                .error(status);
1790
0
    } else {
1791
0
        increase_report_version();
1792
0
        // get path hash of the created tablet
1793
0
        TabletSharedPtr tablet;
1794
0
        {
1795
0
            SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
1796
0
            tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id);
1797
0
        }
1798
        DCHECK(tablet != nullptr);
1799
0
        TTabletInfo tablet_info;
1800
0
        tablet_info.tablet_id = tablet->tablet_id();
1801
0
        tablet_info.schema_hash = tablet->schema_hash();
1802
0
        tablet_info.version = create_tablet_req.version;
1803
0
        // Useless but it is a required field in TTabletInfo
1804
0
        tablet_info.version_hash = 0;
1805
0
        tablet_info.row_count = 0;
1806
0
        tablet_info.data_size = 0;
1807
0
        tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
1808
0
        tablet_info.__set_replica_id(tablet->replica_id());
1809
0
        finish_tablet_infos.push_back(tablet_info);
1810
0
        LOG_INFO("successfully create tablet")
1811
0
                .tag("signature", req.signature)
1812
0
                .tag("tablet_id", create_tablet_req.tablet_id);
1813
0
    }
1814
0
    TFinishTaskRequest finish_task_request;
1815
0
    finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
1816
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1817
0
    finish_task_request.__set_report_version(s_report_version);
1818
0
    finish_task_request.__set_task_type(req.task_type);
1819
    finish_task_request.__set_signature(req.signature);
1820
8.94k
    finish_task_request.__set_task_status(status.to_thrift());
1821
8.94k
    finish_task(finish_task_request);
1822
8.94k
    remove_task_info(req.task_type, req.signature);
1823
8.94k
}
1824
8.94k
1825
8.94k
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1826
8.94k
    const auto& drop_tablet_req = req.drop_tablet_req;
1827
8.94k
    Status status;
1828
8.94k
    auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false);
1829
0
    if (dropped_tablet != nullptr) {
1830
0
        status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id,
1831
8.94k
                                                      drop_tablet_req.replica_id,
1832
                                                      drop_tablet_req.is_drop_table_or_partition);
1833
8.94k
    } else {
1834
8.94k
        status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id);
1835
8.94k
    }
1836
8.94k
    if (status.ok()) {
1837
8.94k
        // if tablet is dropped by fe, then the related txn should also be removed
1838
8.94k
        engine.txn_manager()->force_rollback_tablet_related_txns(
1839
8.94k
                dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
1840
8.94k
                dropped_tablet->tablet_uid());
1841
0
        LOG_INFO("successfully drop tablet")
1842
0
                .tag("signature", req.signature)
1843
0
                .tag("tablet_id", drop_tablet_req.tablet_id)
1844
0
                .tag("replica_id", drop_tablet_req.replica_id);
1845
0
    } else {
1846
0
        LOG_WARNING("failed to drop tablet")
1847
                .tag("signature", req.signature)
1848
8.94k
                .tag("tablet_id", drop_tablet_req.tablet_id)
1849
8.94k
                .tag("replica_id", drop_tablet_req.replica_id)
1850
8.94k
                .error(status);
1851
8.94k
    }
1852
8.94k
1853
    TFinishTaskRequest finish_task_request;
1854
8.94k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1855
8.94k
    finish_task_request.__set_task_type(req.task_type);
1856
8.94k
    finish_task_request.__set_signature(req.signature);
1857
8.94k
    finish_task_request.__set_task_status(status.to_thrift());
1858
1859
8.94k
    TTabletInfo tablet_info;
1860
8.94k
    tablet_info.tablet_id = drop_tablet_req.tablet_id;
1861
8.94k
    tablet_info.schema_hash = drop_tablet_req.schema_hash;
1862
    tablet_info.version = 0;
1863
8.94k
    // Useless but it is a required field in TTabletInfo
1864
8.94k
    tablet_info.version_hash = 0;
1865
8.94k
    tablet_info.row_count = 0;
1866
8.94k
    tablet_info.data_size = 0;
1867
1868
8.94k
    finish_task_request.__set_finish_tablet_infos({tablet_info});
1869
8.94k
    LOG_INFO("successfully drop tablet")
1870
8.94k
            .tag("signature", req.signature)
1871
            .tag("tablet_id", drop_tablet_req.tablet_id);
1872
0
1873
0
    finish_task(finish_task_request);
1874
    remove_task_info(req.task_type, req.signature);
1875
0
}
1876
0
1877
0
void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1878
0
    const auto& drop_tablet_req = req.drop_tablet_req;
1879
0
    // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe
1880
0
    Defer defer = [&] { remove_task_info(req.task_type, req.signature); };
1881
0
    DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
1882
0
        LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
1883
0
                .tag("tablet_id", drop_tablet_req.tablet_id);
1884
0
        return;
1885
0
    });
1886
0
    MonotonicStopWatch watch;
1887
0
    watch.start();
1888
0
    auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
1889
0
    std::ostringstream rowset_ids_stream;
1890
0
    bool found = false;
1891
0
    for (auto& weak_tablet : weak_tablets) {
1892
0
        auto tablet = weak_tablet.lock();
1893
0
        if (tablet == nullptr) {
1894
0
            continue;
1895
0
        }
1896
        if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
1897
0
            continue;
1898
0
        }
1899
0
        found = true;
1900
0
        auto clean_rowsets = tablet->get_snapshot_rowset(true);
1901
0
        // Get first 10 rowset IDs as comma-separated string, just for log
1902
0
        int count = 0;
1903
0
        for (const auto& rowset : clean_rowsets) {
1904
0
            if (count >= 10) break;
1905
0
            if (count > 0) {
1906
                rowset_ids_stream << ",";
1907
0
            }
1908
0
            rowset_ids_stream << rowset->rowset_id().to_string();
1909
0
            count++;
1910
        }
1911
0
1912
0
        CloudTablet::recycle_cached_data(clean_rowsets);
1913
0
        break;
1914
0
    }
1915
0
1916
    if (!found) {
1917
0
        LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
1918
0
                     << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1919
0
        return;
1920
0
    }
1921
0
1922
    engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
1923
0
    LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
1924
0
              << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
1925
              << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1926
0
}
1927
0
1928
0
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1929
    const auto& push_req = req.push_req;
1930
1931
    LOG(INFO) << "get push task. signature=" << req.signature
1932
              << " push_type=" << push_req.push_type;
1933
0
    std::vector<TTabletInfo> tablet_infos;
1934
0
1935
0
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
1936
    // use the arg BackendService_submit_tasks_args.tasks is not const
1937
    // and push_req will be modify, so modify is ok
1938
0
    EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
1939
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1940
0
    auto status = engine_task.execute();
1941
0
1942
0
    // Return result to fe
1943
0
    TFinishTaskRequest finish_task_request;
1944
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1945
    finish_task_request.__set_task_type(req.task_type);
1946
0
    finish_task_request.__set_signature(req.signature);
1947
0
    if (push_req.push_type == TPushType::DELETE) {
1948
0
        finish_task_request.__set_request_version(push_req.version);
1949
0
    }
1950
0
1951
0
    if (status.ok()) {
1952
0
        LOG_INFO("successfully execute push task")
1953
0
                .tag("signature", req.signature)
1954
0
                .tag("tablet_id", push_req.tablet_id)
1955
0
                .tag("push_type", push_req.push_type);
1956
0
        increase_report_version();
1957
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
1958
0
    } else {
1959
0
        LOG_WARNING("failed to execute push task")
1960
0
                .tag("signature", req.signature)
1961
0
                .tag("tablet_id", push_req.tablet_id)
1962
                .tag("push_type", push_req.push_type)
1963
0
                .error(status);
1964
0
    }
1965
0
    finish_task_request.__set_task_status(status.to_thrift());
1966
    finish_task_request.__set_report_version(s_report_version);
1967
3.22k
1968
3.22k
    finish_task(finish_task_request);
1969
    remove_task_info(req.task_type, req.signature);
1970
3.22k
}
1971
3.22k
1972
void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1973
    const auto& push_req = req.push_req;
1974
3.22k
1975
3.22k
    LOG(INFO) << "get push task. signature=" << req.signature
1976
3.22k
              << " push_type=" << push_req.push_type;
1977
3.22k
1978
    // Return result to fe
1979
    TFinishTaskRequest finish_task_request;
1980
3.22k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1981
0
    finish_task_request.__set_task_type(req.task_type);
1982
0
    finish_task_request.__set_signature(req.signature);
1983
0
1984
0
    // Only support DELETE in cloud mode now
1985
0
    if (push_req.push_type != TPushType::DELETE) {
1986
0
        finish_task_request.__set_task_status(
1987
                Status::NotSupported("push_type {} not is supported",
1988
3.22k
                                     std::to_string(push_req.push_type))
1989
                        .to_thrift());
1990
3.22k
        return;
1991
3.22k
    }
1992
3.22k
1993
3.04k
    finish_task_request.__set_request_version(push_req.version);
1994
3.04k
1995
3.04k
    DorisMetrics::instance()->delete_requests_total->increment(1);
1996
3.04k
    auto st = CloudDeleteTask::execute(engine, req.push_req);
1997
3.04k
    if (st.ok()) {
1998
3.04k
        LOG_INFO("successfully execute push task")
1999
                .tag("signature", req.signature)
2000
3.04k
                .tag("tablet_id", push_req.tablet_id)
2001
3.04k
                .tag("push_type", push_req.push_type);
2002
3.04k
        increase_report_version();
2003
175
        auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back();
2004
175
        // Just need tablet_id
2005
175
        tablet_info.tablet_id = push_req.tablet_id;
2006
175
        finish_task_request.__isset.finish_tablet_infos = true;
2007
175
    } else {
2008
175
        DorisMetrics::instance()->delete_requests_failed->increment(1);
2009
175
        LOG_WARNING("failed to execute push task")
2010
                .tag("signature", req.signature)
2011
3.22k
                .tag("tablet_id", push_req.tablet_id)
2012
3.22k
                .tag("push_type", push_req.push_type)
2013
                .error(st);
2014
3.22k
    }
2015
3.22k
2016
3.22k
    finish_task_request.__set_task_status(st.to_thrift());
2017
    finish_task_request.__set_report_version(s_report_version);
2018
2019
6
    finish_task(finish_task_request);
2020
27
    remove_task_info(req.task_type, req.signature);
2021
6
}
2022
2023
PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine)
2024
        : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count,
2025
27
                         [this](const TAgentTaskRequest& task) { publish_version_callback(task); }),
2026
27
          _engine(engine) {}
2027
27
2028
27
PublishVersionWorkerPool::~PublishVersionWorkerPool() = default;
2029
2030
27
void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) {
2031
27
    const auto& publish_version_req = req.publish_version_req;
2032
    DorisMetrics::instance()->publish_task_request_total->increment(1);
2033
27
    VLOG_NOTICE << "get publish version task. signature=" << req.signature;
2034
27
2035
27
    std::set<TTabletId> error_tablet_ids;
2036
27
    std::map<TTabletId, TVersion> succ_tablets;
2037
27
    // partition_id, tablet_id, publish_version
2038
27
    std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
2039
27
    std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
2040
27
    uint32_t retry_time = 0;
2041
27
    Status status;
2042
27
    constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
2043
27
    while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
2044
27
        succ_tablets.clear();
2045
27
        error_tablet_ids.clear();
2046
27
        table_id_to_tablet_id_to_num_delta_rows.clear();
2047
27
        EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids,
2048
27
                                             &succ_tablets, &discontinuous_version_tablets,
2049
27
                                             &table_id_to_tablet_id_to_num_delta_rows);
2050
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2051
0
        status = engine_task.execute();
2052
        if (status.ok()) {
2053
            break;
2054
0
        }
2055
0
2056
0
        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
2057
0
            // there are too many missing versions, it has been be added to async
2058
0
            // publish task, so no need to retry here.
2059
            if (discontinuous_version_tablets.empty()) {
2060
0
                break;
2061
0
            }
2062
0
            LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
2063
0
                                   << "transaction_id: " << publish_version_req.transaction_id;
2064
0
2065
0
            int64_t time_elapsed = time(nullptr) - req.recv_time;
2066
            if (time_elapsed > config::publish_version_task_timeout_s) {
2067
                LOG(INFO) << "task elapsed " << time_elapsed
2068
0
                          << " seconds since it is inserted to queue, it is timeout";
2069
0
                break;
2070
0
            }
2071
0
2072
0
            // Version not continuous, put to queue and wait pre version publish task execute
2073
0
            PUBLISH_VERSION_count << 1;
2074
0
            auto st = _thread_pool->submit_func([this, req] {
2075
0
                this->publish_version_callback(req);
2076
0
                PUBLISH_VERSION_count << -1;
2077
0
            });
2078
0
            if (!st.ok()) [[unlikely]] {
2079
0
                PUBLISH_VERSION_count << -1;
2080
                status = std::move(st);
2081
0
            } else {
2082
0
                return;
2083
0
            }
2084
0
        }
2085
0
2086
0
        LOG_WARNING("failed to publish version")
2087
0
                .tag("transaction_id", publish_version_req.transaction_id)
2088
                .tag("error_tablets_num", error_tablet_ids.size())
2089
27
                .tag("retry_time", retry_time)
2090
0
                .error(status);
2091
0
        ++retry_time;
2092
0
    }
2093
27
2094
27
    for (auto& item : discontinuous_version_tablets) {
2095
0
        _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item),
2096
                                       publish_version_req.transaction_id, false);
2097
    }
2098
0
    TFinishTaskRequest finish_task_request;
2099
0
    if (!status.ok()) [[unlikely]] {
2100
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2101
0
        // if publish failed, return failed, FE will ignore this error and
2102
0
        // check error tablet ids and FE will also republish this task
2103
27
        LOG_WARNING("failed to publish version")
2104
27
                .tag("signature", req.signature)
2105
27
                .tag("transaction_id", publish_version_req.transaction_id)
2106
27
                .tag("error_tablets_num", error_tablet_ids.size())
2107
159
                .error(status);
2108
159
    } else {
2109
159
        if (!config::disable_auto_compaction &&
2110
159
            (!config::enable_compaction_pause_on_high_memory ||
2111
159
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
2112
159
            for (auto [tablet_id, _] : succ_tablets) {
2113
159
                TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
2114
159
                if (tablet != nullptr) {
2115
159
                    if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
2116
159
                        tablet->published_count.fetch_add(1);
2117
159
                        int64_t published_count = tablet->published_count.load();
2118
0
                        int32_t max_version_config = tablet->max_version_config();
2119
0
                        if (tablet->exceed_version_limit(
2120
0
                                    max_version_config *
2121
0
                                    config::load_trigger_compaction_version_percent / 100) &&
2122
0
                            published_count % 20 == 0) {
2123
0
                            auto st = _engine.submit_compaction_task(
2124
0
                                    tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
2125
0
                            if (!st.ok()) [[unlikely]] {
2126
0
                                LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
2127
0
                                             << ", published=" << published_count << " : " << st;
2128
159
                            } else {
2129
159
                                LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
2130
0
                                          << ", published:" << published_count;
2131
0
                            }
2132
159
                        }
2133
27
                    }
2134
27
                } else {
2135
27
                    LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
2136
27
                }
2137
27
            }
2138
27
        }
2139
27
        int64_t cost_second = time(nullptr) - req.recv_time;
2140
27
        g_publish_version_latency << cost_second;
2141
27
        LOG_INFO("successfully publish version")
2142
                .tag("signature", req.signature)
2143
27
                .tag("transaction_id", publish_version_req.transaction_id)
2144
27
                .tag("tablets_num", succ_tablets.size())
2145
27
                .tag("cost(s)", cost_second);
2146
27
    }
2147
27
2148
27
    status.to_thrift(&finish_task_request.task_status);
2149
27
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2150
27
    finish_task_request.__set_task_type(req.task_type);
2151
27
    finish_task_request.__set_signature(req.signature);
2152
27
    finish_task_request.__set_report_version(s_report_version);
2153
27
    finish_task_request.__set_succ_tablets(succ_tablets);
2154
27
    finish_task_request.__set_error_tablet_ids(
2155
27
            std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
2156
    finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
2157
0
            table_id_to_tablet_id_to_num_delta_rows);
2158
0
    finish_task(finish_task_request);
2159
0
    remove_task_info(req.task_type, req.signature);
2160
0
}
2161
0
2162
void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2163
0
    const auto& clear_transaction_task_req = req.clear_transaction_task_req;
2164
    LOG(INFO) << "get clear transaction task. signature=" << req.signature
2165
0
              << ", transaction_id=" << clear_transaction_task_req.transaction_id
2166
              << ", partition_id_size=" << clear_transaction_task_req.partition_id.size();
2167
2168
    Status status;
2169
0
2170
0
    if (clear_transaction_task_req.transaction_id > 0) {
2171
0
        // transaction_id should be greater than zero.
2172
0
        // If it is not greater than zero, no need to execute
2173
0
        // the following clear_transaction_task() function.
2174
0
        if (!clear_transaction_task_req.partition_id.empty()) {
2175
0
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id,
2176
0
                                          clear_transaction_task_req.partition_id);
2177
0
        } else {
2178
0
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id);
2179
0
        }
2180
0
        LOG(INFO) << "finish to clear transaction task. signature=" << req.signature
2181
                  << ", transaction_id=" << clear_transaction_task_req.transaction_id;
2182
0
    } else {
2183
0
        LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id
2184
0
                     << ". signature= " << req.signature;
2185
0
    }
2186
0
2187
    TFinishTaskRequest finish_task_request;
2188
0
    finish_task_request.__set_task_status(status.to_thrift());
2189
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2190
0
    finish_task_request.__set_task_type(req.task_type);
2191
    finish_task_request.__set_signature(req.signature);
2192
0
2193
0
    finish_task(finish_task_request);
2194
0
    remove_task_info(req.task_type, req.signature);
2195
0
}
2196
0
2197
0
void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2198
0
    int64_t signature = req.signature;
2199
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2200
0
    bool is_task_timeout = false;
2201
0
    if (req.__isset.recv_time) {
2202
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2203
0
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2204
0
            LOG(INFO) << "task elapsed " << time_elapsed
2205
0
                      << " seconds since it is inserted to queue, it is timeout";
2206
0
            is_task_timeout = true;
2207
0
        }
2208
0
    }
2209
0
    if (!is_task_timeout) {
2210
0
        TFinishTaskRequest finish_task_request;
2211
0
        TTaskType::type task_type = req.task_type;
2212
0
        alter_tablet(engine, req, signature, task_type, &finish_task_request);
2213
0
        finish_task(finish_task_request);
2214
0
    }
2215
0
    doris::g_fragment_executing_count << -1;
2216
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
2217
                          std::chrono::system_clock::now().time_since_epoch())
2218
10.1k
                          .count();
2219
10.1k
    g_fragment_last_active_time.set_value(now);
2220
10.1k
    remove_task_info(req.task_type, req.signature);
2221
10.1k
}
2222
10.1k
2223
10.1k
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2224
10.1k
    int64_t signature = req.signature;
2225
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2226
0
    bool is_task_timeout = false;
2227
0
    if (req.__isset.recv_time) {
2228
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2229
10.1k
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2230
10.1k
            LOG(INFO) << "task elapsed " << time_elapsed
2231
10.1k
                      << " seconds since it is inserted to queue, it is timeout";
2232
10.1k
            is_task_timeout = true;
2233
10.1k
        }
2234
10.1k
    }
2235
10.1k
    if (!is_task_timeout) {
2236
10.1k
        TFinishTaskRequest finish_task_request;
2237
10.1k
        TTaskType::type task_type = req.task_type;
2238
10.1k
        alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request);
2239
10.1k
        finish_task(finish_task_request);
2240
10.1k
    }
2241
10.1k
    doris::g_fragment_executing_count << -1;
2242
10.1k
    int64_t now = duration_cast<std::chrono::milliseconds>(
2243
                          std::chrono::system_clock::now().time_since_epoch())
2244
0
                          .count();
2245
0
    g_fragment_last_active_time.set_value(now);
2246
0
    remove_task_info(req.task_type, req.signature);
2247
0
}
2248
0
2249
0
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2250
0
    std::unordered_map<int64_t, int64_t> gc_tablet_infos;
2251
0
    if (!req.__isset.gc_binlog_req) {
2252
0
        LOG(WARNING) << "gc binlog task is not valid";
2253
0
        return;
2254
    }
2255
0
    if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
2256
0
        LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
2257
        return;
2258
0
    }
2259
0
2260
    const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos;
2261
0
    for (auto&& tablet_info : tablet_gc_binlog_infos) {
2262
0
        // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
2263
        gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
2264
33
    }
2265
33
2266
33
    engine.gc_binlogs(gc_tablet_infos);
2267
33
}
2268
33
2269
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2270
    const TVisibleVersionReq& visible_version_req = req.visible_version_req;
2271
0
    engine.tablet_manager()->update_partitions_visible_version(
2272
0
            visible_version_req.partition_version);
2273
}
2274
0
2275
0
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
2276
                    const TAgentTaskRequest& req) {
2277
0
    const auto& clone_req = req.clone_req;
2278
0
2279
0
    DorisMetrics::instance()->clone_requests_total->increment(1);
2280
0
    LOG(INFO) << "get clone task. signature=" << req.signature;
2281
2282
0
    std::vector<TTabletInfo> tablet_infos;
2283
0
    EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos);
2284
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2285
0
    auto status = engine_task.execute();
2286
0
    // Return result to fe
2287
    TFinishTaskRequest finish_task_request;
2288
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2289
0
    finish_task_request.__set_task_type(req.task_type);
2290
0
    finish_task_request.__set_signature(req.signature);
2291
0
    finish_task_request.__set_task_status(status.to_thrift());
2292
0
2293
0
    if (!status.ok()) {
2294
0
        DorisMetrics::instance()->clone_requests_failed->increment(1);
2295
0
        LOG_WARNING("failed to clone tablet")
2296
0
                .tag("signature", req.signature)
2297
0
                .tag("tablet_id", clone_req.tablet_id)
2298
0
                .error(status);
2299
0
    } else {
2300
        LOG_INFO("successfully clone tablet")
2301
0
                .tag("signature", req.signature)
2302
0
                .tag("tablet_id", clone_req.tablet_id)
2303
0
                .tag("copy_size", engine_task.get_copy_size())
2304
0
                .tag("copy_time_ms", engine_task.get_copy_time_ms());
2305
0
2306
0
        if (engine_task.is_new_tablet()) {
2307
0
            increase_report_version();
2308
0
            finish_task_request.__set_report_version(s_report_version);
2309
        }
2310
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
2311
0
        finish_task_request.__set_copy_size(engine_task.get_copy_size());
2312
0
        finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms());
2313
    }
2314
0
2315
0
    finish_task(finish_task_request);
2316
    remove_task_info(req.task_type, req.signature);
2317
}
2318
0
2319
0
void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2320
    const auto& storage_medium_migrate_req = req.storage_medium_migrate_req;
2321
0
2322
0
    // check request and get info
2323
0
    TabletSharedPtr tablet;
2324
0
    DataDir* dest_store = nullptr;
2325
0
2326
0
    auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store);
2327
    if (status.ok()) {
2328
0
        EngineStorageMigrationTask engine_task(engine, tablet, dest_store);
2329
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2330
0
        status = engine_task.execute();
2331
0
    }
2332
0
    // fe should ignore this err
2333
0
    if (status.is<FILE_ALREADY_EXIST>()) {
2334
0
        status = Status::OK();
2335
0
    }
2336
0
    if (!status.ok()) {
2337
0
        LOG_WARNING("failed to migrate storage medium")
2338
0
                .tag("signature", req.signature)
2339
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id)
2340
0
                .error(status);
2341
    } else {
2342
0
        LOG_INFO("successfully migrate storage medium")
2343
0
                .tag("signature", req.signature)
2344
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id);
2345
0
    }
2346
0
2347
    TFinishTaskRequest finish_task_request;
2348
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2349
0
    finish_task_request.__set_task_type(req.task_type);
2350
0
    finish_task_request.__set_signature(req.signature);
2351
    finish_task_request.__set_task_status(status.to_thrift());
2352
9.10k
2353
9.10k
    finish_task(finish_task_request);
2354
9.10k
    remove_task_info(req.task_type, req.signature);
2355
9.10k
}
2356
9.10k
2357
9.10k
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2358
9.10k
    std::vector<TTabletId> error_tablet_ids;
2359
9.10k
    std::vector<TTabletId> succ_tablet_ids;
2360
9.10k
    Status status;
2361
9.10k
    error_tablet_ids.clear();
2362
    const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
2363
0
    CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
2364
0
                                                &succ_tablet_ids);
2365
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2366
0
    if (req.signature != calc_delete_bitmap_req.transaction_id) {
2367
9.10k
        // transaction_id may not be the same as req.signature, so add a log here
2368
        LOG_INFO("begin to execute calc delete bitmap task")
2369
9.10k
                .tag("signature", req.signature)
2370
9.10k
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id);
2371
0
    }
2372
0
    status = engine_task.execute();
2373
0
2374
0
    TFinishTaskRequest finish_task_request;
2375
0
    if (!status) {
2376
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2377
0
        LOG_WARNING("failed to calculate delete bitmap")
2378
                .tag("signature", req.signature)
2379
9.10k
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
2380
9.10k
                .tag("error_tablets_num", error_tablet_ids.size())
2381
9.10k
                .error(status);
2382
9.10k
    }
2383
9.10k
2384
9.10k
    status.to_thrift(&finish_task_request.task_status);
2385
9.10k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2386
    finish_task_request.__set_task_type(req.task_type);
2387
9.10k
    finish_task_request.__set_signature(req.signature);
2388
9.10k
    finish_task_request.__set_report_version(s_report_version);
2389
9.10k
    finish_task_request.__set_error_tablet_ids(error_tablet_ids);
2390
    finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);
2391
0
2392
0
    finish_task(finish_task_request);
2393
0
    remove_task_info(req.task_type, req.signature);
2394
0
}
2395
0
2396
0
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2397
0
    LOG(INFO) << "clean trash start";
2398
    DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
2399
0
    static_cast<void>(engine.start_trash_sweep(nullptr, true));
2400
0
    static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
2401
    LOG(INFO) << "clean trash finish";
2402
0
}
2403
0
2404
0
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2405
    const auto& clean_req = req.clean_udf_cache_req;
2406
0
2407
0
    if (doris::config::enable_java_support) {
2408
0
        static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2409
    }
2410
0
2411
0
    if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2412
        UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
2413
597
    }
2414
597
2415
597
    LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
2416
597
}
2417
27.5k
2418
27.5k
void report_index_policy_callback(const ClusterInfo* cluster_info) {
2419
27.5k
    TReportRequest request;
2420
597
    auto& index_policy_list = request.index_policy;
2421
597
    const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys();
2422
597
    for (const auto& policy : policys) {
2423
597
        index_policy_list.emplace_back(policy.second);
2424
597
    }
2425
0
    request.__isset.index_policy = true;
2426
0
    request.__set_backend(BackendOptions::get_local_backend());
2427
597
    bool succ = handle_report(request, cluster_info, "index_policy");
2428
    report_index_policy_total << 1;
2429
    if (!succ) [[unlikely]] {
2430
        report_index_policy_failed << 1;
2431
    }
2432
}
2433
2434
#include "common/compile_check_end.h"
2435
} // namespace doris