Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/task_worker_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/task_worker_pool.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/AgentService_types.h>
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/MasterService_types.h>
26
#include <gen_cpp/Status_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <unistd.h>
29
30
#include <algorithm>
31
// IWYU pragma: no_include <bits/chrono.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
34
#include <atomic>
35
#include <chrono> // IWYU pragma: keep
36
#include <ctime>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <shared_mutex>
41
#include <sstream>
42
#include <string>
43
#include <thread>
44
#include <type_traits>
45
#include <utility>
46
#include <vector>
47
48
#include "agent/utils.h"
49
#include "cloud/cloud_delete_task.h"
50
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
51
#include "cloud/cloud_schema_change_job.h"
52
#include "cloud/cloud_snapshot_loader.h"
53
#include "cloud/cloud_snapshot_mgr.h"
54
#include "cloud/cloud_tablet.h"
55
#include "cloud/cloud_tablet_mgr.h"
56
#include "cloud/config.h"
57
#include "common/config.h"
58
#include "common/logging.h"
59
#include "common/metrics/doris_metrics.h"
60
#include "common/status.h"
61
#include "io/fs/file_system.h"
62
#include "io/fs/hdfs_file_system.h"
63
#include "io/fs/local_file_system.h"
64
#include "io/fs/obj_storage_client.h"
65
#include "io/fs/path.h"
66
#include "io/fs/remote_file_system.h"
67
#include "io/fs/s3_file_system.h"
68
#include "runtime/exec_env.h"
69
#include "runtime/fragment_mgr.h"
70
#include "runtime/index_policy/index_policy_mgr.h"
71
#include "runtime/memory/global_memory_arbitrator.h"
72
#include "runtime/snapshot_loader.h"
73
#include "runtime/user_function_cache.h"
74
#include "service/backend_options.h"
75
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
76
#include "storage/data_dir.h"
77
#include "storage/olap_common.h"
78
#include "storage/rowset/rowset_meta.h"
79
#include "storage/snapshot/snapshot_manager.h"
80
#include "storage/storage_engine.h"
81
#include "storage/storage_policy.h"
82
#include "storage/tablet/tablet.h"
83
#include "storage/tablet/tablet_manager.h"
84
#include "storage/tablet/tablet_meta.h"
85
#include "storage/tablet/tablet_schema.h"
86
#include "storage/task/engine_batch_load_task.h"
87
#include "storage/task/engine_checksum_task.h"
88
#include "storage/task/engine_clone_task.h"
89
#include "storage/task/engine_cloud_index_change_task.h"
90
#include "storage/task/engine_index_change_task.h"
91
#include "storage/task/engine_publish_version_task.h"
92
#include "storage/task/engine_storage_migration_task.h"
93
#include "storage/txn/txn_manager.h"
94
#include "storage/utils.h"
95
#include "util/brpc_client_cache.h"
96
#include "util/debug_points.h"
97
#include "util/jni-util.h"
98
#include "util/mem_info.h"
99
#include "util/random.h"
100
#include "util/s3_util.h"
101
#include "util/stopwatch.hpp"
102
#include "util/threadpool.h"
103
#include "util/time.h"
104
#include "util/trace.h"
105
106
namespace doris {
107
using namespace ErrorCode;
108
109
namespace {
110
111
std::mutex s_task_signatures_mtx;
112
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;
113
114
std::atomic_ulong s_report_version(time(nullptr) * 100000);
115
116
0
void increase_report_version() {
117
0
    s_report_version.fetch_add(1, std::memory_order_relaxed);
118
0
}
119
120
// FIXME(plat1ko): Paired register and remove task info
121
14
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
122
14
    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
123
14
        task_type == TTaskType::type::PUSH_COOLDOWN_CONF ||
124
14
        task_type == TTaskType::type::COMPACTION) {
125
        // no need to report task of these types
126
0
        return true;
127
0
    }
128
129
14
    if (signature == -1) { // No need to report task with unintialized signature
130
12
        return true;
131
12
    }
132
133
2
    std::lock_guard lock(s_task_signatures_mtx);
134
2
    auto& set = s_task_signatures[task_type];
135
2
    return set.insert(signature).second;
136
14
}
137
138
0
void remove_task_info(const TTaskType::type task_type, int64_t signature) {
139
0
    size_t queue_size;
140
0
    {
141
0
        std::lock_guard lock(s_task_signatures_mtx);
142
0
        auto& set = s_task_signatures[task_type];
143
0
        set.erase(signature);
144
0
        queue_size = set.size();
145
0
    }
146
147
0
    VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
148
0
                << ", queue_size=" << queue_size;
149
0
}
150
151
0
void finish_task(const TFinishTaskRequest& finish_task_request) {
152
    // Return result to FE
153
0
    TMasterResult result;
154
0
    uint32_t try_time = 0;
155
0
    constexpr int TASK_FINISH_MAX_RETRY = 3;
156
0
    while (try_time < TASK_FINISH_MAX_RETRY) {
157
0
        DorisMetrics::instance()->finish_task_requests_total->increment(1);
158
0
        Status client_status =
159
0
                MasterServerClient::instance()->finish_task(finish_task_request, &result);
160
161
0
        if (client_status.ok()) {
162
0
            break;
163
0
        } else {
164
0
            DorisMetrics::instance()->finish_task_requests_failed->increment(1);
165
0
            LOG_WARNING("failed to finish task")
166
0
                    .tag("type", finish_task_request.task_type)
167
0
                    .tag("signature", finish_task_request.signature)
168
0
                    .error(result.status);
169
0
            try_time += 1;
170
0
        }
171
0
        sleep(1);
172
0
    }
173
0
}
174
175
Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id,
176
0
                       const TSchemaHash schema_hash, TTabletInfo* tablet_info) {
177
0
    tablet_info->__set_tablet_id(tablet_id);
178
0
    tablet_info->__set_schema_hash(schema_hash);
179
0
    return engine.tablet_manager()->report_tablet_info(tablet_info);
180
0
}
181
182
0
void random_sleep(int second) {
183
0
    Random rnd(static_cast<uint32_t>(UnixMillis()));
184
0
    sleep(rnd.Uniform(second) + 1);
185
0
}
186
187
void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature,
188
0
                  const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) {
189
0
    Status status;
190
191
0
    std::string_view process_name = "alter tablet";
192
    // Check last schema change status, if failed delete tablet file
193
    // Do not need to adjust delete success or not
194
    // Because if delete failed create rollup will failed
195
0
    TTabletId new_tablet_id = 0;
196
0
    TSchemaHash new_schema_hash = 0;
197
0
    if (status.ok()) {
198
0
        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
199
0
        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
200
0
        auto mem_tracker = MemTrackerLimiter::create_shared(
201
0
                MemTrackerLimiter::Type::SCHEMA_CHANGE,
202
0
                fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
203
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
204
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
205
0
                            engine.memory_limitation_bytes_per_thread_for_schema_change()));
206
0
        SCOPED_ATTACH_TASK(mem_tracker);
207
0
        DorisMetrics::instance()->create_rollup_requests_total->increment(1);
208
0
        Status res = Status::OK();
209
0
        try {
210
0
            LOG_INFO("start {}", process_name)
211
0
                    .tag("signature", agent_task_req.signature)
212
0
                    .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
213
0
                    .tag("new_tablet_id", new_tablet_id)
214
0
                    .tag("mem_limit",
215
0
                         engine.memory_limitation_bytes_per_thread_for_schema_change());
216
0
            SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
217
0
                                std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id
218
0
                                                       ? agent_task_req.alter_tablet_req_v2.job_id
219
0
                                                       : 0));
220
0
            status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
221
0
        } catch (const Exception& e) {
222
0
            status = e.to_status();
223
0
        }
224
0
        if (!status.ok()) {
225
0
            DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
226
0
        }
227
0
    }
228
229
0
    if (status.ok()) {
230
0
        increase_report_version();
231
0
    }
232
233
    // Return result to fe
234
0
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
235
0
    finish_task_request->__set_report_version(s_report_version);
236
0
    finish_task_request->__set_task_type(task_type);
237
0
    finish_task_request->__set_signature(signature);
238
239
0
    std::vector<TTabletInfo> finish_tablet_infos;
240
0
    if (status.ok()) {
241
0
        TTabletInfo tablet_info;
242
0
        status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info);
243
0
        if (status.ok()) {
244
0
            finish_tablet_infos.push_back(tablet_info);
245
0
        }
246
0
    }
247
248
0
    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
249
0
        LOG_WARNING("failed to {}", process_name)
250
0
                .tag("signature", agent_task_req.signature)
251
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
252
0
                .tag("new_tablet_id", new_tablet_id)
253
0
                .error(status);
254
0
    } else {
255
0
        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
256
0
        LOG_INFO("successfully {}", process_name)
257
0
                .tag("signature", agent_task_req.signature)
258
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
259
0
                .tag("new_tablet_id", new_tablet_id);
260
0
    }
261
0
    finish_task_request->__set_task_status(status.to_thrift());
262
0
}
263
264
void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req,
265
                        int64_t signature, const TTaskType::type task_type,
266
0
                        TFinishTaskRequest* finish_task_request) {
267
0
    Status status;
268
269
0
    std::string_view process_name = "alter tablet";
270
    // Check last schema change status, if failed delete tablet file
271
    // Do not need to adjust delete success or not
272
    // Because if delete failed create rollup will failed
273
0
    TTabletId new_tablet_id = 0;
274
0
    new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
275
0
    auto mem_tracker = MemTrackerLimiter::create_shared(
276
0
            MemTrackerLimiter::Type::SCHEMA_CHANGE,
277
0
            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
278
0
                        std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
279
0
                        std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
280
0
                        engine.memory_limitation_bytes_per_thread_for_schema_change()));
281
0
    SCOPED_ATTACH_TASK(mem_tracker);
282
0
    DorisMetrics::instance()->create_rollup_requests_total->increment(1);
283
284
0
    LOG_INFO("start {}", process_name)
285
0
            .tag("signature", agent_task_req.signature)
286
0
            .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
287
0
            .tag("new_tablet_id", new_tablet_id)
288
0
            .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
289
0
    DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
290
0
    CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
291
0
                             agent_task_req.alter_tablet_req_v2.expiration);
292
0
    status = [&]() {
293
0
        HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
294
0
                job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
295
0
                [&](const doris::Exception& ex) {
296
0
                    DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
297
0
                    job.clean_up_on_failure();
298
0
                });
299
0
        return Status::OK();
300
0
    }();
301
302
0
    if (status.ok()) {
303
0
        increase_report_version();
304
0
        LOG_INFO("successfully {}", process_name)
305
0
                .tag("signature", agent_task_req.signature)
306
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
307
0
                .tag("new_tablet_id", new_tablet_id);
308
0
    } else {
309
0
        LOG_WARNING("failed to {}", process_name)
310
0
                .tag("signature", agent_task_req.signature)
311
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
312
0
                .tag("new_tablet_id", new_tablet_id)
313
0
                .error(status);
314
0
    }
315
316
    // Return result to fe
317
0
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
318
0
    finish_task_request->__set_report_version(s_report_version);
319
0
    finish_task_request->__set_task_type(task_type);
320
0
    finish_task_request->__set_signature(signature);
321
0
    finish_task_request->__set_task_status(status.to_thrift());
322
0
}
323
324
Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req,
325
0
                             TabletSharedPtr& tablet, DataDir** dest_store) {
326
0
    int64_t tablet_id = req.tablet_id;
327
0
    tablet = engine.tablet_manager()->get_tablet(tablet_id);
328
0
    if (tablet == nullptr) {
329
0
        return Status::InternalError("could not find tablet {}", tablet_id);
330
0
    }
331
332
0
    if (req.__isset.data_dir) {
333
        // request specify the data dir
334
0
        *dest_store = engine.get_store(req.data_dir);
335
0
        if (*dest_store == nullptr) {
336
0
            return Status::InternalError("could not find data dir {}", req.data_dir);
337
0
        }
338
0
    } else {
339
        // this is a storage medium
340
        // get data dir by storage medium
341
342
        // judge case when no need to migrate
343
0
        uint32_t count = engine.available_storage_medium_type_count();
344
0
        if (count <= 1) {
345
0
            return Status::InternalError("available storage medium type count is less than 1");
346
0
        }
347
        // check current tablet storage medium
348
0
        TStorageMedium::type storage_medium = req.storage_medium;
349
0
        TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
350
0
        if (src_storage_medium == storage_medium) {
351
0
            return Status::InternalError("tablet is already on specified storage medium {}",
352
0
                                         storage_medium);
353
0
        }
354
        // get a random store of specified storage medium
355
0
        auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium);
356
0
        if (stores.empty()) {
357
0
            return Status::InternalError("failed to get root path for create tablet");
358
0
        }
359
360
0
        *dest_store = stores[0];
361
0
    }
362
0
    if (tablet->data_dir()->path() == (*dest_store)->path()) {
363
0
        LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
364
0
        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
365
0
                                                        tablet->data_dir()->path());
366
0
    }
367
368
    // check local disk capacity
369
0
    int64_t tablet_size = tablet->tablet_local_size();
370
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
371
0
        return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
372
0
                                             (*dest_store)->path(), tablet_size);
373
0
    }
374
0
    return Status::OK();
375
0
}
376
377
// Return `true` if report success
378
bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info,
379
0
                   std::string_view name) {
380
0
    TMasterResult result;
381
0
    Status status = MasterServerClient::instance()->report(request, &result);
382
0
    if (!status.ok()) [[unlikely]] {
383
0
        LOG_WARNING("failed to report {}", name)
384
0
                .tag("host", cluster_info->master_fe_addr.hostname)
385
0
                .tag("port", cluster_info->master_fe_addr.port)
386
0
                .error(status);
387
0
        return false;
388
0
    }
389
390
0
    else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
391
0
        LOG_WARNING("failed to report {}", name)
392
0
                .tag("host", cluster_info->master_fe_addr.hostname)
393
0
                .tag("port", cluster_info->master_fe_addr.port)
394
0
                .error(result.status);
395
0
        return false;
396
0
    }
397
398
0
    return true;
399
0
}
400
401
Status _submit_task(const TAgentTaskRequest& task,
402
14
                    std::function<Status(const TAgentTaskRequest&)> submit_op) {
403
14
    const TTaskType::type task_type = task.task_type;
404
14
    int64_t signature = task.signature;
405
406
14
    std::string type_str;
407
14
    EnumToString(TTaskType, task_type, type_str);
408
14
    VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature;
409
410
14
    if (!register_task_info(task_type, signature)) {
411
1
        LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
412
        // Duplicated task request, just return OK
413
1
        return Status::OK();
414
1
    }
415
416
    // TODO(plat1ko): check task request member
417
418
    // Set the receiving time of task so that we can determine whether it is timed out later
419
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
420
    // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok
421
13
    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
422
13
    auto st = submit_op(task);
423
13
    if (!st.ok()) [[unlikely]] {
424
1
        LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature);
425
1
        return st;
426
1
    }
427
428
12
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
429
12
    return Status::OK();
430
13
}
431
432
bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
433
434
bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX");
435
bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY");
436
bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD");
437
bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD");
438
bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT");
439
bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT");
440
bvar::Adder<uint64_t> MOVE_count("task", "MOVE");
441
bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION");
442
bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY");
443
bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY");
444
bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF");
445
bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE");
446
bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE");
447
bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION");
448
bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK");
449
bvar::Adder<uint64_t> DELETE_count("task", "DELETE");
450
bvar::Adder<uint64_t> PUSH_count("task", "PUSH");
451
bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO");
452
bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
453
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
454
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
455
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
456
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
457
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");
458
459
23
void add_task_count(const TAgentTaskRequest& task, int n) {
460
    // clang-format off
461
23
    switch (task.task_type) {
462
0
    #define ADD_TASK_COUNT(type) \
463
21
    case TTaskType::type:        \
464
42
        type##_count << n;       \
465
21
        return;
466
0
    ADD_TASK_COUNT(ALTER_INVERTED_INDEX)
467
0
    ADD_TASK_COUNT(CHECK_CONSISTENCY)
468
0
    ADD_TASK_COUNT(UPLOAD)
469
0
    ADD_TASK_COUNT(DOWNLOAD)
470
0
    ADD_TASK_COUNT(MAKE_SNAPSHOT)
471
0
    ADD_TASK_COUNT(RELEASE_SNAPSHOT)
472
0
    ADD_TASK_COUNT(MOVE)
473
0
    ADD_TASK_COUNT(COMPACTION)
474
0
    ADD_TASK_COUNT(PUSH_STORAGE_POLICY)
475
0
    ADD_TASK_COUNT(PUSH_INDEX_POLICY)
476
0
    ADD_TASK_COUNT(PUSH_COOLDOWN_CONF)
477
21
    ADD_TASK_COUNT(CREATE)
478
0
    ADD_TASK_COUNT(DROP)
479
0
    ADD_TASK_COUNT(PUBLISH_VERSION)
480
0
    ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
481
0
    ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
482
0
    ADD_TASK_COUNT(CLONE)
483
0
    ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
484
0
    ADD_TASK_COUNT(GC_BINLOG)
485
0
    ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
486
0
    ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
487
0
    #undef ADD_TASK_COUNT
488
0
    case TTaskType::REALTIME_PUSH:
489
0
    case TTaskType::PUSH:
490
0
        if (task.push_req.push_type == TPushType::LOAD_V2) {
491
0
            PUSH_count << n;
492
0
        } else if (task.push_req.push_type == TPushType::DELETE) {
493
0
            DELETE_count << n;
494
0
        }
495
0
        return;
496
2
    case TTaskType::ALTER:
497
2
    {
498
2
        ALTER_count << n;
499
        // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment
500
2
        if (n > 0) {
501
            // only count fragment when task is actually starting
502
1
            doris::g_fragment_executing_count << 1;
503
1
            int64_t now = duration_cast<std::chrono::milliseconds>(
504
1
                                std::chrono::system_clock::now().time_since_epoch())
505
1
                                .count();
506
1
            g_fragment_last_active_time.set_value(now);
507
1
        }
508
2
        return;
509
0
    }
510
0
    default:
511
0
        return;
512
23
    }
513
    // clang-format on
514
23
}
515
516
bvar::Adder<uint64_t> report_task_total("report", "task_total");
517
bvar::Adder<uint64_t> report_task_failed("report", "task_failed");
518
bvar::Adder<uint64_t> report_disk_total("report", "disk_total");
519
bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed");
520
bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total");
521
bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
522
bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total");
523
bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed");
524
525
} // namespace
526
527
TaskWorkerPool::TaskWorkerPool(
528
        std::string_view name, int worker_count,
529
        std::function<void(const TAgentTaskRequest& task)> callback,
530
        std::function<void(const TAgentTaskRequest& task)> pre_submit_callback)
531
3
        : _callback(std::move(callback)), _pre_submit_callback(std::move(pre_submit_callback)) {
532
3
    auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
533
3
                      .set_min_threads(worker_count)
534
3
                      .set_max_threads(worker_count)
535
3
                      .build(&_thread_pool);
536
3
    CHECK(st.ok()) << name << ": " << st;
537
3
}
538
539
3
TaskWorkerPool::~TaskWorkerPool() {
540
3
    stop();
541
3
}
542
543
6
void TaskWorkerPool::stop() {
544
6
    if (_stopped.exchange(true)) {
545
3
        return;
546
3
    }
547
548
3
    if (_thread_pool) {
549
3
        _thread_pool->shutdown();
550
3
    }
551
3
}
552
553
8
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
554
8
    return _submit_task(task, [this](auto&& task) {
555
7
        if (_pre_submit_callback) {
556
3
            _pre_submit_callback(task);
557
3
        }
558
7
        add_task_count(task, 1);
559
7
        return _thread_pool->submit_func([this, task]() {
560
5
            _callback(task);
561
5
            add_task_count(task, -1);
562
5
        });
563
7
    });
564
8
}
565
566
PriorTaskWorkerPool::PriorTaskWorkerPool(
567
        const std::string& name, int normal_worker_count, int high_prior_worker_count,
568
        std::function<void(const TAgentTaskRequest& task)> callback)
569
1
        : _callback(std::move(callback)) {
570
2
    for (int i = 0; i < normal_worker_count; ++i) {
571
1
        auto st = Thread::create(
572
1
                "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
573
1
        CHECK(st.ok()) << name << ": " << st;
574
1
    }
575
576
2
    for (int i = 0; i < high_prior_worker_count; ++i) {
577
1
        auto st = Thread::create(
578
1
                "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
579
1
        CHECK(st.ok()) << name << ": " << st;
580
1
    }
581
1
}
582
583
1
PriorTaskWorkerPool::~PriorTaskWorkerPool() {
584
1
    stop();
585
1
}
586
587
2
void PriorTaskWorkerPool::stop() {
588
2
    {
589
2
        std::lock_guard lock(_mtx);
590
2
        if (_stopped) {
591
1
            return;
592
1
        }
593
594
1
        _stopped = true;
595
1
    }
596
0
    _normal_condv.notify_all();
597
1
    _high_prior_condv.notify_all();
598
599
2
    for (auto&& w : _workers) {
600
2
        if (w) {
601
2
            w->join();
602
2
        }
603
2
    }
604
1
}
605
606
6
Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
607
6
    return _submit_task(task, [this](auto&& task) {
608
6
        auto req = std::make_unique<TAgentTaskRequest>(task);
609
6
        add_task_count(*req, 1);
610
6
        if (req->__isset.priority && req->priority == TPriority::HIGH) {
611
4
            std::lock_guard lock(_mtx);
612
4
            _high_prior_queue.push_back(std::move(req));
613
4
            _high_prior_condv.notify_one();
614
4
            _normal_condv.notify_one();
615
4
        } else {
616
2
            std::lock_guard lock(_mtx);
617
2
            _normal_queue.push_back(std::move(req));
618
2
            _normal_condv.notify_one();
619
2
        }
620
6
        return Status::OK();
621
6
    });
622
6
}
623
624
0
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) {
625
0
    const TTaskType::type task_type = task.task_type;
626
0
    int64_t signature = task.signature;
627
0
    std::string type_str;
628
0
    EnumToString(TTaskType, task_type, type_str);
629
0
    auto req = std::make_unique<TAgentTaskRequest>(task);
630
631
0
    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
632
0
    do {
633
0
        std::lock_guard lock(s_task_signatures_mtx);
634
0
        auto& set = s_task_signatures[task_type];
635
0
        if (!set.contains(signature)) {
636
            // If it doesn't exist, put it directly into the priority queue
637
0
            add_task_count(*req, 1);
638
0
            set.insert(signature);
639
0
            std::lock_guard temp_lock(_mtx);
640
0
            _high_prior_queue.push_back(std::move(req));
641
0
            _high_prior_condv.notify_one();
642
0
            _normal_condv.notify_one();
643
0
            break;
644
0
        } else {
645
0
            std::lock_guard temp_lock(_mtx);
646
0
            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
647
                // If it exists in the normal queue, cancel the task in the normal queue
648
0
                if ((*it)->signature == signature) {
649
0
                    _normal_queue.erase(it);                     // cancel the original task
650
0
                    _high_prior_queue.push_back(std::move(req)); // add the new task to the queue
651
0
                    _high_prior_condv.notify_one();
652
0
                    _normal_condv.notify_one();
653
0
                    break;
654
0
                } else {
655
0
                    ++it; // doesn't meet the condition, continue to the next one
656
0
                }
657
0
            }
658
            // If it exists in the high priority queue, no operation is needed
659
0
            LOG_INFO("task has already existed in high prior queue.").tag("signature", signature);
660
0
        }
661
0
    } while (false);
662
663
    // Set the receiving time of task so that we can determine whether it is timed out later
664
0
    task.__set_recv_time(time(nullptr));
665
666
0
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
667
0
    return Status::OK();
668
0
}
669
670
1
void PriorTaskWorkerPool::normal_loop() {
671
4
    while (true) {
672
4
        std::unique_ptr<TAgentTaskRequest> req;
673
674
4
        {
675
4
            std::unique_lock lock(_mtx);
676
8
            _normal_condv.wait(lock, [&] {
677
8
                return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped;
678
8
            });
679
680
4
            if (_stopped) {
681
1
                return;
682
1
            }
683
684
3
            if (!_high_prior_queue.empty()) {
685
1
                req = std::move(_high_prior_queue.front());
686
1
                _high_prior_queue.pop_front();
687
2
            } else if (!_normal_queue.empty()) {
688
2
                req = std::move(_normal_queue.front());
689
2
                _normal_queue.pop_front();
690
2
            } else {
691
0
                continue;
692
0
            }
693
3
        }
694
695
3
        _callback(*req);
696
3
        add_task_count(*req, -1);
697
3
    }
698
1
}
699
700
1
void PriorTaskWorkerPool::high_prior_loop() {
701
3
    while (true) {
702
3
        std::unique_ptr<TAgentTaskRequest> req;
703
704
3
        {
705
3
            std::unique_lock lock(_mtx);
706
6
            _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; });
707
708
3
            if (_stopped) {
709
1
                return;
710
1
            }
711
712
2
            if (_high_prior_queue.empty()) {
713
0
                continue;
714
0
            }
715
716
2
            req = std::move(_high_prior_queue.front());
717
2
            _high_prior_queue.pop_front();
718
2
        }
719
720
0
        _callback(*req);
721
2
        add_task_count(*req, -1);
722
2
    }
723
1
}
724
725
ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s,
726
                           std::function<void()> callback)
727
1
        : _name(std::move(name)) {
728
1
    auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] {
729
1
        auto& engine = ExecEnv::GetInstance()->storage_engine();
730
1
        engine.register_report_listener(this);
731
5
        while (true) {
732
5
            {
733
5
                std::unique_lock lock(_mtx);
734
5
                _condv.wait_for(lock, std::chrono::seconds(report_interval_s),
735
9
                                [&] { return _stopped || _signal; });
736
737
5
                if (_stopped) {
738
1
                    break;
739
1
                }
740
741
4
                if (_signal) {
742
                    // Consume received signal
743
3
                    _signal = false;
744
3
                }
745
4
            }
746
747
4
            if (cluster_info->master_fe_addr.port == 0) {
748
                // port == 0 means not received heartbeat yet
749
1
                LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report";
750
1
                continue;
751
1
            }
752
753
3
            callback();
754
3
        }
755
1
        engine.deregister_report_listener(this);
756
1
    };
757
758
1
    auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
759
1
    CHECK(st.ok()) << _name << ": " << st;
760
1
}
761
762
1
ReportWorker::~ReportWorker() {
763
1
    stop();
764
1
}
765
766
4
void ReportWorker::notify() {
767
4
    {
768
4
        std::lock_guard lock(_mtx);
769
4
        _signal = true;
770
4
    }
771
4
    _condv.notify_all();
772
4
}
773
774
2
void ReportWorker::stop() {
775
2
    {
776
2
        std::lock_guard lock(_mtx);
777
2
        if (_stopped) {
778
1
            return;
779
1
        }
780
781
1
        _stopped = true;
782
1
    }
783
0
    _condv.notify_all();
784
1
    if (_thread) {
785
1
        _thread->join();
786
1
    }
787
1
}
788
789
0
void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
790
0
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
791
0
    LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature
792
0
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
793
0
              << ", job_id=" << alter_inverted_index_rq.job_id;
794
795
0
    Status status = Status::OK();
796
0
    auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id);
797
0
    if (tablet_ptr != nullptr) {
798
0
        EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req);
799
0
        status = engine_task.execute();
800
0
    } else {
801
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
802
0
    }
803
804
    // Return result to fe
805
0
    TFinishTaskRequest finish_task_request;
806
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
807
0
    finish_task_request.__set_task_type(req.task_type);
808
0
    finish_task_request.__set_signature(req.signature);
809
0
    if (!status.ok()) {
810
0
        LOG(WARNING) << "[index_change]failed to alter inverted index task, signature="
811
0
                     << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
812
0
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
813
0
    } else {
814
0
        LOG(INFO) << "[index_change]successfully alter inverted index task, signature="
815
0
                  << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
816
0
                  << ", job_id=" << alter_inverted_index_rq.job_id;
817
0
    }
818
0
    finish_task_request.__set_task_status(status.to_thrift());
819
0
    finish_task(finish_task_request);
820
0
    remove_task_info(req.task_type, req.signature);
821
0
}
822
823
0
void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
824
0
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
825
0
    LOG(INFO) << "get alter inverted index task. signature=" << req.signature
826
0
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
827
0
              << ", job_id=" << alter_inverted_index_rq.job_id;
828
829
0
    Status status = Status::OK();
830
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
831
0
    if (tablet_ptr != nullptr) {
832
0
        EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
833
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
834
0
        status = engine_task.execute();
835
0
    } else {
836
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
837
0
    }
838
839
    // Return result to fe
840
0
    TFinishTaskRequest finish_task_request;
841
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
842
0
    finish_task_request.__set_task_type(req.task_type);
843
0
    finish_task_request.__set_signature(req.signature);
844
0
    std::vector<TTabletInfo> finish_tablet_infos;
845
0
    if (!status.ok()) {
846
0
        LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature
847
0
                     << ", tablet_id=" << alter_inverted_index_rq.tablet_id
848
0
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
849
0
    } else {
850
0
        LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature
851
0
                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
852
0
                  << ", job_id=" << alter_inverted_index_rq.job_id;
853
0
        TTabletInfo tablet_info;
854
0
        status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id,
855
0
                                 alter_inverted_index_rq.schema_hash, &tablet_info);
856
0
        if (status.ok()) {
857
0
            finish_tablet_infos.push_back(tablet_info);
858
0
        }
859
0
        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
860
0
    }
861
0
    finish_task_request.__set_task_status(status.to_thrift());
862
0
    finish_task(finish_task_request);
863
0
    remove_task_info(req.task_type, req.signature);
864
0
}
865
866
0
void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
867
0
    LOG(INFO) << "get update tablet meta task. signature=" << req.signature;
868
869
0
    Status status;
870
0
    const auto& update_tablet_meta_req = req.update_tablet_meta_info_req;
871
0
    for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
872
0
        auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id);
873
0
        if (tablet == nullptr) {
874
0
            status = Status::NotFound("tablet not found");
875
0
            LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id="
876
0
                         << tablet_meta_info.tablet_id;
877
0
            continue;
878
0
        }
879
0
        bool need_to_save = false;
880
0
        if (tablet_meta_info.__isset.partition_id) {
881
            // for fix partition_id = 0
882
0
            LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id()
883
0
                         << "partition id from : " << tablet->tablet_meta()->partition_id()
884
0
                         << " to : " << tablet_meta_info.partition_id;
885
0
            auto succ = engine.tablet_manager()->update_tablet_partition_id(
886
0
                    tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id());
887
0
            if (!succ) {
888
0
                std::string err_msg = fmt::format(
889
0
                        "change be tablet id : {} partition_id : {} failed",
890
0
                        tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id);
891
0
                LOG(WARNING) << err_msg;
892
0
                status = Status::InvalidArgument(err_msg);
893
0
                continue;
894
0
            }
895
0
            need_to_save = true;
896
0
        }
897
0
        if (tablet_meta_info.__isset.storage_policy_id) {
898
0
            tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
899
0
            need_to_save = true;
900
0
        }
901
0
        if (tablet_meta_info.__isset.is_in_memory) {
902
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
903
0
                    tablet_meta_info.is_in_memory);
904
0
            std::shared_lock rlock(tablet->get_header_lock());
905
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
906
0
                rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
907
0
            }
908
0
            tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
909
0
            need_to_save = true;
910
0
        }
911
0
        if (tablet_meta_info.__isset.compaction_policy) {
912
0
            if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY &&
913
0
                tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) {
914
0
                status = Status::InvalidArgument(
915
0
                        "invalid compaction policy, only support for size_based or "
916
0
                        "time_series");
917
0
                continue;
918
0
            }
919
0
            tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
920
0
            need_to_save = true;
921
0
        }
922
0
        if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
923
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
924
0
                status = Status::InvalidArgument(
925
0
                        "only time series compaction policy support time series config");
926
0
                continue;
927
0
            }
928
0
            tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
929
0
                    tablet_meta_info.time_series_compaction_goal_size_mbytes);
930
0
            need_to_save = true;
931
0
        }
932
0
        if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
933
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
934
0
                status = Status::InvalidArgument(
935
0
                        "only time series compaction policy support time series config");
936
0
                continue;
937
0
            }
938
0
            tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
939
0
                    tablet_meta_info.time_series_compaction_file_count_threshold);
940
0
            need_to_save = true;
941
0
        }
942
0
        if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
943
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
944
0
                status = Status::InvalidArgument(
945
0
                        "only time series compaction policy support time series config");
946
0
                continue;
947
0
            }
948
0
            tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
949
0
                    tablet_meta_info.time_series_compaction_time_threshold_seconds);
950
0
            need_to_save = true;
951
0
        }
952
0
        if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
953
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
954
0
                status = Status::InvalidArgument(
955
0
                        "only time series compaction policy support time series config");
956
0
                continue;
957
0
            }
958
0
            tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
959
0
                    tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
960
0
            need_to_save = true;
961
0
        }
962
0
        if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
963
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
964
0
                status = Status::InvalidArgument(
965
0
                        "only time series compaction policy support time series config");
966
0
                continue;
967
0
            }
968
0
            tablet->tablet_meta()->set_time_series_compaction_level_threshold(
969
0
                    tablet_meta_info.time_series_compaction_level_threshold);
970
0
            need_to_save = true;
971
0
        }
972
0
        if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) {
973
0
            tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group(
974
0
                    tablet_meta_info.vertical_compaction_num_columns_per_group);
975
0
            need_to_save = true;
976
0
        }
977
0
        if (tablet_meta_info.__isset.replica_id) {
978
0
            tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
979
0
        }
980
0
        if (tablet_meta_info.__isset.binlog_config) {
981
            // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums
982
0
            const auto& t_binlog_config = tablet_meta_info.binlog_config;
983
0
            if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds ||
984
0
                !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) {
985
0
                status = Status::InvalidArgument("invalid binlog config, some fields not set");
986
0
                LOG(WARNING) << fmt::format(
987
0
                        "invalid binlog config, some fields not set, tablet_id={}, "
988
0
                        "t_binlog_config={}",
989
0
                        tablet_meta_info.tablet_id,
990
0
                        apache::thrift::ThriftDebugString(t_binlog_config));
991
0
                continue;
992
0
            }
993
994
0
            BinlogConfig new_binlog_config;
995
0
            new_binlog_config = tablet_meta_info.binlog_config;
996
0
            LOG(INFO) << fmt::format(
997
0
                    "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, "
998
0
                    "new_binlog_config={}",
999
0
                    tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(),
1000
0
                    new_binlog_config.to_string());
1001
0
            tablet->set_binlog_config(new_binlog_config);
1002
0
            need_to_save = true;
1003
0
        }
1004
0
        if (tablet_meta_info.__isset.enable_single_replica_compaction) {
1005
0
            std::shared_lock rlock(tablet->get_header_lock());
1006
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
1007
0
                    tablet_meta_info.enable_single_replica_compaction);
1008
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1009
0
                rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
1010
0
                        tablet_meta_info.enable_single_replica_compaction);
1011
0
            }
1012
0
            tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
1013
0
                    tablet_meta_info.enable_single_replica_compaction);
1014
0
            need_to_save = true;
1015
0
        }
1016
0
        if (tablet_meta_info.__isset.disable_auto_compaction) {
1017
0
            std::shared_lock rlock(tablet->get_header_lock());
1018
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
1019
0
                    tablet_meta_info.disable_auto_compaction);
1020
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1021
0
                rowset_meta->tablet_schema()->set_disable_auto_compaction(
1022
0
                        tablet_meta_info.disable_auto_compaction);
1023
0
            }
1024
0
            tablet->tablet_schema_unlocked()->set_disable_auto_compaction(
1025
0
                    tablet_meta_info.disable_auto_compaction);
1026
0
            need_to_save = true;
1027
0
        }
1028
1029
0
        if (tablet_meta_info.__isset.skip_write_index_on_load) {
1030
0
            std::shared_lock rlock(tablet->get_header_lock());
1031
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
1032
0
                    tablet_meta_info.skip_write_index_on_load);
1033
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1034
0
                rowset_meta->tablet_schema()->set_skip_write_index_on_load(
1035
0
                        tablet_meta_info.skip_write_index_on_load);
1036
0
            }
1037
0
            tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
1038
0
                    tablet_meta_info.skip_write_index_on_load);
1039
0
            need_to_save = true;
1040
0
        }
1041
0
        if (need_to_save) {
1042
0
            std::shared_lock rlock(tablet->get_header_lock());
1043
0
            tablet->save_meta();
1044
0
        }
1045
0
    }
1046
1047
0
    LOG(INFO) << "finish update tablet meta task. signature=" << req.signature;
1048
0
    if (req.signature != -1) {
1049
0
        TFinishTaskRequest finish_task_request;
1050
0
        finish_task_request.__set_task_status(status.to_thrift());
1051
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1052
0
        finish_task_request.__set_task_type(req.task_type);
1053
0
        finish_task_request.__set_signature(req.signature);
1054
0
        finish_task(finish_task_request);
1055
0
        remove_task_info(req.task_type, req.signature);
1056
0
    }
1057
0
}
1058
1059
0
void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1060
0
    uint32_t checksum = 0;
1061
0
    const auto& check_consistency_req = req.check_consistency_req;
1062
0
    EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
1063
0
                                   check_consistency_req.schema_hash, check_consistency_req.version,
1064
0
                                   &checksum);
1065
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1066
0
    Status status = engine_task.execute();
1067
0
    if (!status.ok()) {
1068
0
        LOG_WARNING("failed to check consistency")
1069
0
                .tag("signature", req.signature)
1070
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1071
0
                .error(status);
1072
0
    } else {
1073
0
        LOG_INFO("successfully check consistency")
1074
0
                .tag("signature", req.signature)
1075
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1076
0
                .tag("checksum", checksum);
1077
0
    }
1078
1079
0
    TFinishTaskRequest finish_task_request;
1080
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1081
0
    finish_task_request.__set_task_type(req.task_type);
1082
0
    finish_task_request.__set_signature(req.signature);
1083
0
    finish_task_request.__set_task_status(status.to_thrift());
1084
0
    finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
1085
0
    finish_task_request.__set_request_version(check_consistency_req.version);
1086
1087
0
    finish_task(finish_task_request);
1088
0
    remove_task_info(req.task_type, req.signature);
1089
0
}
1090
1091
0
void report_task_callback(const ClusterInfo* cluster_info) {
1092
0
    TReportRequest request;
1093
0
    if (config::report_random_wait) {
1094
0
        random_sleep(5);
1095
0
    }
1096
0
    request.__isset.tasks = true;
1097
0
    {
1098
0
        std::lock_guard lock(s_task_signatures_mtx);
1099
0
        auto& tasks = request.tasks;
1100
0
        for (auto&& [task_type, signatures] : s_task_signatures) {
1101
0
            auto& set = tasks[task_type];
1102
0
            for (auto&& signature : signatures) {
1103
0
                set.insert(signature);
1104
0
            }
1105
0
        }
1106
0
    }
1107
0
    request.__set_backend(BackendOptions::get_local_backend());
1108
0
    request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
1109
0
    bool succ = handle_report(request, cluster_info, "task");
1110
0
    report_task_total << 1;
1111
0
    if (!succ) [[unlikely]] {
1112
0
        report_task_failed << 1;
1113
0
    }
1114
0
}
1115
1116
0
void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1117
0
    TReportRequest request;
1118
0
    request.__set_backend(BackendOptions::get_local_backend());
1119
0
    request.__isset.disks = true;
1120
1121
0
    std::vector<DataDirInfo> data_dir_infos;
1122
0
    static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */));
1123
1124
0
    for (auto& root_path_info : data_dir_infos) {
1125
0
        TDisk disk;
1126
0
        disk.__set_root_path(root_path_info.path);
1127
0
        disk.__set_path_hash(root_path_info.path_hash);
1128
0
        disk.__set_storage_medium(root_path_info.storage_medium);
1129
0
        disk.__set_disk_total_capacity(root_path_info.disk_capacity);
1130
0
        disk.__set_data_used_capacity(root_path_info.local_used_capacity);
1131
0
        disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
1132
0
        disk.__set_disk_available_capacity(root_path_info.available);
1133
0
        disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
1134
0
        disk.__set_used(root_path_info.is_used);
1135
0
        request.disks[root_path_info.path] = disk;
1136
0
    }
1137
0
    request.__set_num_cores(CpuInfo::num_cores());
1138
0
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1139
0
                                                 ? config::pipeline_executor_size
1140
0
                                                 : CpuInfo::num_cores());
1141
0
    bool succ = handle_report(request, cluster_info, "disk");
1142
0
    report_disk_total << 1;
1143
0
    if (!succ) [[unlikely]] {
1144
0
        report_disk_failed << 1;
1145
0
    }
1146
0
}
1147
1148
0
void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1149
    // Random sleep 1~5 seconds before doing report.
1150
    // In order to avoid the problem that the FE receives many report requests at the same time
1151
    // and can not be processed.
1152
0
    if (config::report_random_wait) {
1153
0
        random_sleep(5);
1154
0
    }
1155
0
    (void)engine; // To be used in the future
1156
1157
0
    TReportRequest request;
1158
0
    request.__set_backend(BackendOptions::get_local_backend());
1159
0
    request.__isset.disks = true;
1160
1161
    // TODO(deardeng): report disk info in cloud mode. And make it more clear
1162
    //                 that report CPU by using a separte report procedure
1163
    //                 or abstracting disk report as "host info report"
1164
0
    request.__set_num_cores(CpuInfo::num_cores());
1165
0
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1166
0
                                                 ? config::pipeline_executor_size
1167
0
                                                 : CpuInfo::num_cores());
1168
0
    bool succ = handle_report(request, cluster_info, "disk");
1169
0
    report_disk_total << 1;
1170
0
    report_disk_failed << !succ;
1171
0
}
1172
1173
0
void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1174
0
    if (config::report_random_wait) {
1175
0
        random_sleep(5);
1176
0
    }
1177
1178
0
    TReportRequest request;
1179
0
    request.__set_backend(BackendOptions::get_local_backend());
1180
0
    request.__isset.tablets = true;
1181
1182
0
    increase_report_version();
1183
0
    uint64_t report_version;
1184
0
    for (int i = 0; i < 5; i++) {
1185
0
        request.tablets.clear();
1186
0
        report_version = s_report_version;
1187
0
        engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
1188
0
        if (report_version == s_report_version) {
1189
0
            break;
1190
0
        }
1191
0
    }
1192
1193
0
    if (report_version < s_report_version) {
1194
        // TODO llj This can only reduce the possibility for report error, but can't avoid it.
1195
        // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
1196
        // report, and the report version has a small probability that it has not been updated in time. When FE
1197
        // receives this report, it is possible to delete the new tablet.
1198
0
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1199
0
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1200
0
        return;
1201
0
    }
1202
1203
0
    std::map<int64_t, int64_t> partitions_version;
1204
0
    engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
1205
0
    request.__set_partitions_version(std::move(partitions_version));
1206
1207
0
    int64_t max_compaction_score =
1208
0
            std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
1209
0
                     DorisMetrics::instance()->tablet_base_max_compaction_score->value());
1210
0
    request.__set_tablet_max_compaction_score(max_compaction_score);
1211
0
    request.__set_report_version(report_version);
1212
1213
    // report storage policy and resource
1214
0
    auto& storage_policy_list = request.storage_policy;
1215
0
    for (auto [id, version] : get_storage_policy_ids()) {
1216
0
        auto& storage_policy = storage_policy_list.emplace_back();
1217
0
        storage_policy.__set_id(id);
1218
0
        storage_policy.__set_version(version);
1219
0
    }
1220
0
    request.__isset.storage_policy = true;
1221
0
    auto& resource_list = request.resource;
1222
0
    for (auto [id_str, version] : get_storage_resource_ids()) {
1223
0
        auto& resource = resource_list.emplace_back();
1224
0
        int64_t id = -1;
1225
0
        if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
1226
0
            ec != std::errc {}) [[unlikely]] {
1227
0
            LOG(ERROR) << "invalid resource id format: " << id_str;
1228
0
        } else {
1229
0
            resource.__set_id(id);
1230
0
            resource.__set_version(version);
1231
0
        }
1232
0
    }
1233
0
    request.__isset.resource = true;
1234
1235
0
    bool succ = handle_report(request, cluster_info, "tablet");
1236
0
    report_tablet_total << 1;
1237
0
    if (!succ) [[unlikely]] {
1238
0
        report_tablet_failed << 1;
1239
0
    }
1240
0
}
1241
1242
0
void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1243
    // Random sleep 1~5 seconds before doing report.
1244
    // In order to avoid the problem that the FE receives many report requests at the same time
1245
    // and can not be processed.
1246
0
    if (config::report_random_wait) {
1247
0
        random_sleep(5);
1248
0
    }
1249
1250
0
    TReportRequest request;
1251
0
    request.__set_backend(BackendOptions::get_local_backend());
1252
0
    request.__isset.tablets = true;
1253
1254
0
    increase_report_version();
1255
0
    uint64_t report_version;
1256
0
    uint64_t total_num_tablets = 0;
1257
0
    for (int i = 0; i < 5; i++) {
1258
0
        request.tablets.clear();
1259
0
        report_version = s_report_version;
1260
0
        engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
1261
0
        if (report_version == s_report_version) {
1262
0
            break;
1263
0
        }
1264
0
    }
1265
1266
0
    if (report_version < s_report_version) {
1267
0
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1268
0
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1269
0
        return;
1270
0
    }
1271
1272
0
    request.__set_report_version(report_version);
1273
0
    request.__set_num_tablets(total_num_tablets);
1274
1275
0
    bool succ = handle_report(request, cluster_info, "tablet");
1276
0
    report_tablet_total << 1;
1277
0
    if (!succ) [[unlikely]] {
1278
0
        report_tablet_failed << 1;
1279
0
    }
1280
0
}
1281
1282
0
void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1283
0
    const auto& upload_request = req.upload_req;
1284
1285
0
    LOG(INFO) << "get upload task. signature=" << req.signature
1286
0
              << ", job_id=" << upload_request.job_id;
1287
1288
0
    std::map<int64_t, std::vector<std::string>> tablet_files;
1289
0
    std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1290
0
            engine, env, upload_request.job_id, req.signature, upload_request.broker_addr,
1291
0
            upload_request.broker_prop);
1292
0
    SCOPED_ATTACH_TASK(loader->resource_ctx());
1293
0
    Status status =
1294
0
            loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend
1295
0
                                                                : TStorageBackendType::type::BROKER,
1296
0
                         upload_request.__isset.location ? upload_request.location : "");
1297
0
    if (status.ok()) {
1298
0
        status = loader->upload(upload_request.src_dest_map, &tablet_files);
1299
0
    }
1300
1301
0
    if (!status.ok()) {
1302
0
        LOG_WARNING("failed to upload")
1303
0
                .tag("signature", req.signature)
1304
0
                .tag("job_id", upload_request.job_id)
1305
0
                .error(status);
1306
0
    } else {
1307
0
        LOG_INFO("successfully upload")
1308
0
                .tag("signature", req.signature)
1309
0
                .tag("job_id", upload_request.job_id);
1310
0
    }
1311
1312
0
    TFinishTaskRequest finish_task_request;
1313
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1314
0
    finish_task_request.__set_task_type(req.task_type);
1315
0
    finish_task_request.__set_signature(req.signature);
1316
0
    finish_task_request.__set_task_status(status.to_thrift());
1317
0
    finish_task_request.__set_tablet_files(tablet_files);
1318
1319
0
    finish_task(finish_task_request);
1320
0
    remove_task_info(req.task_type, req.signature);
1321
0
}
1322
1323
0
void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1324
0
    const auto& download_request = req.download_req;
1325
0
    LOG(INFO) << "get download task. signature=" << req.signature
1326
0
              << ", job_id=" << download_request.job_id
1327
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1328
1329
    // TODO: download
1330
0
    std::vector<int64_t> downloaded_tablet_ids;
1331
1332
0
    auto status = Status::OK();
1333
0
    if (download_request.__isset.remote_tablet_snapshots) {
1334
0
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1335
0
                engine, env, download_request.job_id, req.signature);
1336
0
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1337
0
        status = loader->remote_http_download(download_request.remote_tablet_snapshots,
1338
0
                                              &downloaded_tablet_ids);
1339
0
    } else {
1340
0
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1341
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1342
0
                download_request.broker_prop);
1343
0
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1344
0
        status = loader->init(download_request.__isset.storage_backend
1345
0
                                      ? download_request.storage_backend
1346
0
                                      : TStorageBackendType::type::BROKER,
1347
0
                              download_request.__isset.location ? download_request.location : "");
1348
0
        if (status.ok()) {
1349
0
            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
1350
0
        }
1351
0
    }
1352
1353
0
    if (!status.ok()) {
1354
0
        LOG_WARNING("failed to download")
1355
0
                .tag("signature", req.signature)
1356
0
                .tag("job_id", download_request.job_id)
1357
0
                .error(status);
1358
0
    } else {
1359
0
        LOG_INFO("successfully download")
1360
0
                .tag("signature", req.signature)
1361
0
                .tag("job_id", download_request.job_id);
1362
0
    }
1363
1364
0
    TFinishTaskRequest finish_task_request;
1365
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1366
0
    finish_task_request.__set_task_type(req.task_type);
1367
0
    finish_task_request.__set_signature(req.signature);
1368
0
    finish_task_request.__set_task_status(status.to_thrift());
1369
0
    finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
1370
1371
0
    finish_task(finish_task_request);
1372
0
    remove_task_info(req.task_type, req.signature);
1373
0
}
1374
1375
0
void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1376
0
    const auto& download_request = req.download_req;
1377
0
    LOG(INFO) << "get download task. signature=" << req.signature
1378
0
              << ", job_id=" << download_request.job_id
1379
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1380
1381
0
    std::vector<int64_t> transferred_tablet_ids;
1382
1383
0
    auto status = Status::OK();
1384
0
    if (download_request.__isset.remote_tablet_snapshots) {
1385
0
        status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
1386
0
                "remote tablet snapshot is not supported.");
1387
0
    } else {
1388
0
        std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>(
1389
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1390
0
                download_request.broker_prop);
1391
0
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1392
0
        status = loader->init(download_request.__isset.storage_backend
1393
0
                                      ? download_request.storage_backend
1394
0
                                      : TStorageBackendType::type::BROKER,
1395
0
                              download_request.__isset.location ? download_request.location : "",
1396
0
                              download_request.vault_id);
1397
0
        if (status.ok()) {
1398
0
            status = loader->download(download_request.src_dest_map, &transferred_tablet_ids);
1399
0
        }
1400
1401
0
        if (!status.ok()) {
1402
0
            LOG_WARNING("failed to download")
1403
0
                    .tag("signature", req.signature)
1404
0
                    .tag("job_id", download_request.job_id)
1405
0
                    .error(status);
1406
0
        } else {
1407
0
            LOG_INFO("successfully download")
1408
0
                    .tag("signature", req.signature)
1409
0
                    .tag("job_id", download_request.job_id);
1410
0
        }
1411
1412
0
        TFinishTaskRequest finish_task_request;
1413
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1414
0
        finish_task_request.__set_task_type(req.task_type);
1415
0
        finish_task_request.__set_signature(req.signature);
1416
0
        finish_task_request.__set_task_status(status.to_thrift());
1417
0
        finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids);
1418
1419
0
        finish_task(finish_task_request);
1420
0
        remove_task_info(req.task_type, req.signature);
1421
0
    }
1422
0
}
1423
1424
0
void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1425
0
    const auto& snapshot_request = req.snapshot_req;
1426
1427
0
    LOG(INFO) << "get snapshot task. signature=" << req.signature;
1428
1429
0
    std::string snapshot_path;
1430
0
    bool allow_incremental_clone = false; // not used
1431
0
    std::vector<std::string> snapshot_files;
1432
0
    Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
1433
0
                                                         &allow_incremental_clone);
1434
0
    if (status.ok() && snapshot_request.__isset.list_files) {
1435
        // list and save all snapshot files
1436
        // snapshot_path like: data/snapshot/20180417205230.1.86400
1437
        // we need to add subdir: tablet_id/schema_hash/
1438
0
        std::vector<io::FileInfo> files;
1439
0
        bool exists = true;
1440
0
        io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id,
1441
0
                                    snapshot_request.schema_hash);
1442
0
        status = io::global_local_filesystem()->list(path, true, &files, &exists);
1443
0
        if (status.ok()) {
1444
0
            for (auto& file : files) {
1445
0
                snapshot_files.push_back(file.file_name);
1446
0
            }
1447
0
        }
1448
0
    }
1449
0
    if (!status.ok()) {
1450
0
        LOG_WARNING("failed to make snapshot")
1451
0
                .tag("signature", req.signature)
1452
0
                .tag("tablet_id", snapshot_request.tablet_id)
1453
0
                .tag("version", snapshot_request.version)
1454
0
                .error(status);
1455
0
    } else {
1456
0
        LOG_INFO("successfully make snapshot")
1457
0
                .tag("signature", req.signature)
1458
0
                .tag("tablet_id", snapshot_request.tablet_id)
1459
0
                .tag("version", snapshot_request.version)
1460
0
                .tag("snapshot_path", snapshot_path);
1461
0
    }
1462
1463
0
    TFinishTaskRequest finish_task_request;
1464
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1465
0
    finish_task_request.__set_task_type(req.task_type);
1466
0
    finish_task_request.__set_signature(req.signature);
1467
0
    finish_task_request.__set_snapshot_path(snapshot_path);
1468
0
    finish_task_request.__set_snapshot_files(snapshot_files);
1469
0
    finish_task_request.__set_task_status(status.to_thrift());
1470
1471
0
    finish_task(finish_task_request);
1472
0
    remove_task_info(req.task_type, req.signature);
1473
0
}
1474
1475
0
void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1476
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1477
1478
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1479
1480
0
    const std::string& snapshot_path = release_snapshot_request.snapshot_path;
1481
0
    Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
1482
0
    if (!status.ok()) {
1483
0
        LOG_WARNING("failed to release snapshot")
1484
0
                .tag("signature", req.signature)
1485
0
                .tag("snapshot_path", snapshot_path)
1486
0
                .error(status);
1487
0
    } else {
1488
0
        LOG_INFO("successfully release snapshot")
1489
0
                .tag("signature", req.signature)
1490
0
                .tag("snapshot_path", snapshot_path);
1491
0
    }
1492
1493
0
    TFinishTaskRequest finish_task_request;
1494
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1495
0
    finish_task_request.__set_task_type(req.task_type);
1496
0
    finish_task_request.__set_signature(req.signature);
1497
0
    finish_task_request.__set_task_status(status.to_thrift());
1498
1499
0
    finish_task(finish_task_request);
1500
0
    remove_task_info(req.task_type, req.signature);
1501
0
}
1502
1503
0
void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1504
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1505
1506
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1507
1508
0
    Status status = engine.cloud_snapshot_mgr().release_snapshot(
1509
0
            release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed);
1510
1511
0
    if (!status.ok()) {
1512
0
        LOG_WARNING("failed to release snapshot")
1513
0
                .tag("signature", req.signature)
1514
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1515
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed)
1516
0
                .error(status);
1517
0
    } else {
1518
0
        LOG_INFO("successfully release snapshot")
1519
0
                .tag("signature", req.signature)
1520
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1521
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed);
1522
0
    }
1523
1524
0
    TFinishTaskRequest finish_task_request;
1525
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1526
0
    finish_task_request.__set_task_type(req.task_type);
1527
0
    finish_task_request.__set_signature(req.signature);
1528
0
    finish_task_request.__set_task_status(status.to_thrift());
1529
1530
0
    finish_task(finish_task_request);
1531
0
    remove_task_info(req.task_type, req.signature);
1532
0
}
1533
1534
0
void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1535
0
    const auto& move_dir_req = req.move_dir_req;
1536
1537
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1538
0
              << ", job_id=" << move_dir_req.job_id;
1539
0
    Status status;
1540
0
    auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id);
1541
0
    if (tablet == nullptr) {
1542
0
        status = Status::InvalidArgument("Could not find tablet");
1543
0
    } else {
1544
0
        SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id);
1545
0
        SCOPED_ATTACH_TASK(loader.resource_ctx());
1546
0
        status = loader.move(move_dir_req.src, tablet, true);
1547
0
    }
1548
1549
0
    if (!status.ok()) {
1550
0
        LOG_WARNING("failed to move dir")
1551
0
                .tag("signature", req.signature)
1552
0
                .tag("job_id", move_dir_req.job_id)
1553
0
                .tag("tablet_id", move_dir_req.tablet_id)
1554
0
                .tag("src", move_dir_req.src)
1555
0
                .error(status);
1556
0
    } else {
1557
0
        LOG_INFO("successfully move dir")
1558
0
                .tag("signature", req.signature)
1559
0
                .tag("job_id", move_dir_req.job_id)
1560
0
                .tag("tablet_id", move_dir_req.tablet_id)
1561
0
                .tag("src", move_dir_req.src);
1562
0
    }
1563
1564
0
    TFinishTaskRequest finish_task_request;
1565
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1566
0
    finish_task_request.__set_task_type(req.task_type);
1567
0
    finish_task_request.__set_signature(req.signature);
1568
0
    finish_task_request.__set_task_status(status.to_thrift());
1569
1570
0
    finish_task(finish_task_request);
1571
0
    remove_task_info(req.task_type, req.signature);
1572
0
}
1573
1574
0
void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1575
0
    const auto& move_dir_req = req.move_dir_req;
1576
1577
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1578
0
              << ", job_id=" << move_dir_req.job_id;
1579
1580
0
    Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id);
1581
0
    if (!status.ok()) {
1582
0
        LOG_WARNING("failed to move dir")
1583
0
                .tag("signature", req.signature)
1584
0
                .tag("job_id", move_dir_req.job_id)
1585
0
                .tag("tablet_id", move_dir_req.tablet_id)
1586
0
                .error(status);
1587
0
    } else {
1588
0
        LOG_INFO("successfully move dir")
1589
0
                .tag("signature", req.signature)
1590
0
                .tag("job_id", move_dir_req.job_id)
1591
0
                .tag("tablet_id", move_dir_req.tablet_id);
1592
0
    }
1593
1594
0
    TFinishTaskRequest finish_task_request;
1595
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1596
0
    finish_task_request.__set_task_type(req.task_type);
1597
0
    finish_task_request.__set_signature(req.signature);
1598
0
    finish_task_request.__set_task_status(status.to_thrift());
1599
1600
0
    finish_task(finish_task_request);
1601
0
    remove_task_info(req.task_type, req.signature);
1602
0
}
1603
1604
0
void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1605
0
    const auto& compaction_req = req.compaction_req;
1606
1607
0
    LOG(INFO) << "get compaction task. signature=" << req.signature
1608
0
              << ", compaction_type=" << compaction_req.type;
1609
1610
0
    CompactionType compaction_type;
1611
0
    if (compaction_req.type == "base") {
1612
0
        compaction_type = CompactionType::BASE_COMPACTION;
1613
0
    } else {
1614
0
        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
1615
0
    }
1616
1617
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id);
1618
0
    if (tablet_ptr != nullptr) {
1619
0
        auto* data_dir = tablet_ptr->data_dir();
1620
0
        if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
1621
0
            LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id()
1622
0
                         << ", compaction_type=" << compaction_type;
1623
0
            return;
1624
0
        }
1625
1626
0
        Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false);
1627
0
        if (!status.ok()) {
1628
0
            LOG(WARNING) << "failed to submit table compaction task. error=" << status;
1629
0
        }
1630
0
    }
1631
0
}
1632
1633
namespace {
1634
1635
0
void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1636
0
    Status st;
1637
0
    io::RemoteFileSystemSPtr fs;
1638
1639
0
    if (!existed_fs) {
1640
        // No such FS instance on BE
1641
0
        auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
1642
0
                                            std::to_string(param.id));
1643
0
        if (!res.has_value()) {
1644
0
            st = std::move(res).error();
1645
0
        } else {
1646
0
            fs = std::move(res).value();
1647
0
        }
1648
0
    } else {
1649
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
1650
0
        auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
1651
0
        auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
1652
0
        S3ClientConf conf = std::move(new_s3_conf.client_conf);
1653
0
        st = client->reset(conf);
1654
0
        fs = std::move(existed_fs);
1655
0
    }
1656
1657
0
    if (!st.ok()) {
1658
0
        LOG(WARNING) << "update s3 resource failed: " << st;
1659
0
    } else {
1660
0
        LOG_INFO("successfully update s3 resource")
1661
0
                .tag("resource_id", param.id)
1662
0
                .tag("resource_name", param.name);
1663
0
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1664
0
    }
1665
0
}
1666
1667
0
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1668
0
    Status st;
1669
0
    io::RemoteFileSystemSPtr fs;
1670
0
    std::string root_path =
1671
0
            param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";
1672
1673
0
    if (!existed_fs) {
1674
        // No such FS instance on BE
1675
0
        auto res = io::HdfsFileSystem::create(
1676
0
                param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
1677
0
                std::to_string(param.id), nullptr, std::move(root_path));
1678
0
        if (!res.has_value()) {
1679
0
            st = std::move(res).error();
1680
0
        } else {
1681
0
            fs = std::move(res).value();
1682
0
        }
1683
1684
0
    } else {
1685
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
1686
        // TODO(plat1ko): update hdfs conf
1687
0
        fs = std::move(existed_fs);
1688
0
    }
1689
1690
0
    if (!st.ok()) {
1691
0
        LOG(WARNING) << "update hdfs resource failed: " << st;
1692
0
    } else {
1693
0
        LOG_INFO("successfully update hdfs resource")
1694
0
                .tag("resource_id", param.id)
1695
0
                .tag("resource_name", param.name)
1696
0
                .tag("root_path", fs->root_path().string());
1697
0
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1698
0
    }
1699
0
}
1700
1701
} // namespace
1702
1703
0
void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1704
0
    const auto& push_storage_policy_req = req.push_storage_policy_req;
1705
    // refresh resource
1706
0
    for (auto&& param : push_storage_policy_req.resource) {
1707
0
        io::RemoteFileSystemSPtr fs;
1708
0
        if (auto existed_resource = get_storage_resource(param.id); existed_resource) {
1709
0
            if (existed_resource->second >= param.version) {
1710
                // Stale request, ignore
1711
0
                continue;
1712
0
            }
1713
1714
0
            fs = std::move(existed_resource->first.fs);
1715
0
        }
1716
1717
0
        if (param.__isset.s3_storage_param) {
1718
0
            update_s3_resource(param, std::move(fs));
1719
0
        } else if (param.__isset.hdfs_storage_param) {
1720
0
            update_hdfs_resource(param, std::move(fs));
1721
0
        } else {
1722
0
            LOG(WARNING) << "unknown resource=" << param;
1723
0
        }
1724
0
    }
1725
    // drop storage policy
1726
0
    for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
1727
0
        delete_storage_policy(policy_id);
1728
0
    }
1729
    // refresh storage policy
1730
0
    for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
1731
0
        auto existed_storage_policy = get_storage_policy(storage_policy.id);
1732
0
        if (existed_storage_policy == nullptr ||
1733
0
            existed_storage_policy->version < storage_policy.version) {
1734
0
            auto storage_policy1 = std::make_shared<StoragePolicy>();
1735
0
            storage_policy1->name = storage_policy.name;
1736
0
            storage_policy1->version = storage_policy.version;
1737
0
            storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
1738
0
            storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
1739
0
            storage_policy1->resource_id = storage_policy.resource_id;
1740
0
            LOG_INFO("successfully update storage policy")
1741
0
                    .tag("storage_policy_id", storage_policy.id)
1742
0
                    .tag("storage_policy", storage_policy1->to_string());
1743
0
            put_storage_policy(storage_policy.id, std::move(storage_policy1));
1744
0
        }
1745
0
    }
1746
0
}
1747
1748
0
void push_index_policy_callback(const TAgentTaskRequest& req) {
1749
0
    const auto& request = req.push_index_policy_req;
1750
0
    doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes(
1751
0
            request.index_policys, request.dropped_index_policys);
1752
0
}
1753
1754
0
void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1755
0
    const auto& push_cooldown_conf_req = req.push_cooldown_conf;
1756
0
    for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
1757
0
        int64_t tablet_id = cooldown_conf.tablet_id;
1758
0
        TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
1759
0
        if (tablet == nullptr) {
1760
0
            LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
1761
0
            continue;
1762
0
        }
1763
0
        if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
1764
0
                                         cooldown_conf.cooldown_replica_id) &&
1765
0
            cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
1766
0
            tablet->tablet_meta()->cooldown_meta_id().initialized()) {
1767
0
            Tablet::async_write_cooldown_meta(tablet);
1768
0
        }
1769
0
    }
1770
0
}
1771
1772
0
void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1773
0
    const auto& create_tablet_req = req.create_tablet_req;
1774
0
    RuntimeProfile runtime_profile("CreateTablet");
1775
0
    RuntimeProfile* profile = &runtime_profile;
1776
0
    MonotonicStopWatch watch;
1777
0
    watch.start();
1778
0
    Defer defer = [&] {
1779
0
        auto elapsed_time = static_cast<double>(watch.elapsed_time());
1780
0
        if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
1781
0
#include "common/compile_check_avoid_begin.h"
1782
0
            COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
1783
0
#include "common/compile_check_avoid_end.h"
1784
0
            std::stringstream ss;
1785
0
            profile->pretty_print(&ss);
1786
0
            LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str();
1787
0
        }
1788
0
    };
1789
0
    DorisMetrics::instance()->create_tablet_requests_total->increment(1);
1790
0
    VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
1791
1792
0
    std::vector<TTabletInfo> finish_tablet_infos;
1793
0
    VLOG_NOTICE << "create tablet: " << create_tablet_req;
1794
0
    Status status = engine.create_tablet(create_tablet_req, profile);
1795
0
    if (!status.ok()) {
1796
0
        DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
1797
0
        LOG_WARNING("failed to create tablet, reason={}", status.to_string())
1798
0
                .tag("signature", req.signature)
1799
0
                .tag("tablet_id", create_tablet_req.tablet_id)
1800
0
                .error(status);
1801
0
    } else {
1802
0
        increase_report_version();
1803
        // get path hash of the created tablet
1804
0
        TabletSharedPtr tablet;
1805
0
        {
1806
0
            SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
1807
0
            tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id);
1808
0
        }
1809
0
        DCHECK(tablet != nullptr);
1810
0
        TTabletInfo tablet_info;
1811
0
        tablet_info.tablet_id = tablet->tablet_id();
1812
0
        tablet_info.schema_hash = tablet->schema_hash();
1813
0
        tablet_info.version = create_tablet_req.version;
1814
        // Useless but it is a required field in TTabletInfo
1815
0
        tablet_info.version_hash = 0;
1816
0
        tablet_info.row_count = 0;
1817
0
        tablet_info.data_size = 0;
1818
0
        tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
1819
0
        tablet_info.__set_replica_id(tablet->replica_id());
1820
0
        finish_tablet_infos.push_back(tablet_info);
1821
0
        LOG_INFO("successfully create tablet")
1822
0
                .tag("signature", req.signature)
1823
0
                .tag("tablet_id", create_tablet_req.tablet_id);
1824
0
    }
1825
0
    TFinishTaskRequest finish_task_request;
1826
0
    finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
1827
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1828
0
    finish_task_request.__set_report_version(s_report_version);
1829
0
    finish_task_request.__set_task_type(req.task_type);
1830
0
    finish_task_request.__set_signature(req.signature);
1831
0
    finish_task_request.__set_task_status(status.to_thrift());
1832
0
    finish_task(finish_task_request);
1833
0
    remove_task_info(req.task_type, req.signature);
1834
0
}
1835
1836
0
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1837
0
    const auto& drop_tablet_req = req.drop_tablet_req;
1838
0
    Status status;
1839
0
    auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false);
1840
0
    if (dropped_tablet != nullptr) {
1841
0
        status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id,
1842
0
                                                      drop_tablet_req.replica_id,
1843
0
                                                      drop_tablet_req.is_drop_table_or_partition);
1844
0
    } else {
1845
0
        status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id);
1846
0
    }
1847
0
    if (status.ok()) {
1848
        // if tablet is dropped by fe, then the related txn should also be removed
1849
0
        engine.txn_manager()->force_rollback_tablet_related_txns(
1850
0
                dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
1851
0
                dropped_tablet->tablet_uid());
1852
0
        LOG_INFO("successfully drop tablet")
1853
0
                .tag("signature", req.signature)
1854
0
                .tag("tablet_id", drop_tablet_req.tablet_id)
1855
0
                .tag("replica_id", drop_tablet_req.replica_id);
1856
0
    } else {
1857
0
        LOG_WARNING("failed to drop tablet")
1858
0
                .tag("signature", req.signature)
1859
0
                .tag("tablet_id", drop_tablet_req.tablet_id)
1860
0
                .tag("replica_id", drop_tablet_req.replica_id)
1861
0
                .error(status);
1862
0
    }
1863
1864
0
    TFinishTaskRequest finish_task_request;
1865
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1866
0
    finish_task_request.__set_task_type(req.task_type);
1867
0
    finish_task_request.__set_signature(req.signature);
1868
0
    finish_task_request.__set_task_status(status.to_thrift());
1869
1870
0
    TTabletInfo tablet_info;
1871
0
    tablet_info.tablet_id = drop_tablet_req.tablet_id;
1872
0
    tablet_info.schema_hash = drop_tablet_req.schema_hash;
1873
0
    tablet_info.version = 0;
1874
    // Useless but it is a required field in TTabletInfo
1875
0
    tablet_info.version_hash = 0;
1876
0
    tablet_info.row_count = 0;
1877
0
    tablet_info.data_size = 0;
1878
1879
0
    finish_task_request.__set_finish_tablet_infos({tablet_info});
1880
0
    LOG_INFO("successfully drop tablet")
1881
0
            .tag("signature", req.signature)
1882
0
            .tag("tablet_id", drop_tablet_req.tablet_id);
1883
1884
0
    finish_task(finish_task_request);
1885
0
    remove_task_info(req.task_type, req.signature);
1886
0
}
1887
1888
0
void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1889
0
    const auto& drop_tablet_req = req.drop_tablet_req;
1890
    // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe
1891
0
    Defer defer = [&] { remove_task_info(req.task_type, req.signature); };
1892
0
    DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
1893
0
        LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
1894
0
                .tag("tablet_id", drop_tablet_req.tablet_id);
1895
0
        return;
1896
0
    });
1897
0
    MonotonicStopWatch watch;
1898
0
    watch.start();
1899
0
    auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
1900
0
    std::ostringstream rowset_ids_stream;
1901
0
    bool found = false;
1902
0
    for (auto& weak_tablet : weak_tablets) {
1903
0
        auto tablet = weak_tablet.lock();
1904
0
        if (tablet == nullptr) {
1905
0
            continue;
1906
0
        }
1907
0
        if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
1908
0
            continue;
1909
0
        }
1910
0
        found = true;
1911
0
        auto clean_rowsets = tablet->get_snapshot_rowset(true);
1912
        // Get first 10 rowset IDs as comma-separated string, just for log
1913
0
        int count = 0;
1914
0
        for (const auto& rowset : clean_rowsets) {
1915
0
            if (count >= 10) break;
1916
0
            if (count > 0) {
1917
0
                rowset_ids_stream << ",";
1918
0
            }
1919
0
            rowset_ids_stream << rowset->rowset_id().to_string();
1920
0
            count++;
1921
0
        }
1922
1923
0
        CloudTablet::recycle_cached_data(clean_rowsets);
1924
0
        break;
1925
0
    }
1926
1927
0
    if (!found) {
1928
0
        LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
1929
0
                     << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1930
0
        return;
1931
0
    }
1932
1933
0
    engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
1934
0
    LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
1935
0
              << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
1936
0
              << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1937
0
}
1938
1939
0
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1940
0
    const auto& push_req = req.push_req;
1941
1942
0
    LOG(INFO) << "get push task. signature=" << req.signature
1943
0
              << " push_type=" << push_req.push_type;
1944
0
    std::vector<TTabletInfo> tablet_infos;
1945
1946
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
1947
    // use the arg BackendService_submit_tasks_args.tasks is not const
1948
    // and push_req will be modify, so modify is ok
1949
0
    EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
1950
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1951
0
    auto status = engine_task.execute();
1952
1953
    // Return result to fe
1954
0
    TFinishTaskRequest finish_task_request;
1955
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1956
0
    finish_task_request.__set_task_type(req.task_type);
1957
0
    finish_task_request.__set_signature(req.signature);
1958
0
    if (push_req.push_type == TPushType::DELETE) {
1959
0
        finish_task_request.__set_request_version(push_req.version);
1960
0
    }
1961
1962
0
    if (status.ok()) {
1963
0
        LOG_INFO("successfully execute push task")
1964
0
                .tag("signature", req.signature)
1965
0
                .tag("tablet_id", push_req.tablet_id)
1966
0
                .tag("push_type", push_req.push_type);
1967
0
        increase_report_version();
1968
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
1969
0
    } else {
1970
0
        LOG_WARNING("failed to execute push task")
1971
0
                .tag("signature", req.signature)
1972
0
                .tag("tablet_id", push_req.tablet_id)
1973
0
                .tag("push_type", push_req.push_type)
1974
0
                .error(status);
1975
0
    }
1976
0
    finish_task_request.__set_task_status(status.to_thrift());
1977
0
    finish_task_request.__set_report_version(s_report_version);
1978
1979
0
    finish_task(finish_task_request);
1980
0
    remove_task_info(req.task_type, req.signature);
1981
0
}
1982
1983
0
void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1984
0
    const auto& push_req = req.push_req;
1985
1986
0
    LOG(INFO) << "get push task. signature=" << req.signature
1987
0
              << " push_type=" << push_req.push_type;
1988
1989
    // Return result to fe
1990
0
    TFinishTaskRequest finish_task_request;
1991
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1992
0
    finish_task_request.__set_task_type(req.task_type);
1993
0
    finish_task_request.__set_signature(req.signature);
1994
1995
    // Only support DELETE in cloud mode now
1996
0
    if (push_req.push_type != TPushType::DELETE) {
1997
0
        finish_task_request.__set_task_status(
1998
0
                Status::NotSupported("push_type {} not is supported",
1999
0
                                     std::to_string(push_req.push_type))
2000
0
                        .to_thrift());
2001
0
        return;
2002
0
    }
2003
2004
0
    finish_task_request.__set_request_version(push_req.version);
2005
2006
0
    DorisMetrics::instance()->delete_requests_total->increment(1);
2007
0
    auto st = CloudDeleteTask::execute(engine, req.push_req);
2008
0
    if (st.ok()) {
2009
0
        LOG_INFO("successfully execute push task")
2010
0
                .tag("signature", req.signature)
2011
0
                .tag("tablet_id", push_req.tablet_id)
2012
0
                .tag("push_type", push_req.push_type);
2013
0
        increase_report_version();
2014
0
        auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back();
2015
        // Just need tablet_id
2016
0
        tablet_info.tablet_id = push_req.tablet_id;
2017
0
        finish_task_request.__isset.finish_tablet_infos = true;
2018
0
    } else {
2019
0
        DorisMetrics::instance()->delete_requests_failed->increment(1);
2020
0
        LOG_WARNING("failed to execute push task")
2021
0
                .tag("signature", req.signature)
2022
0
                .tag("tablet_id", push_req.tablet_id)
2023
0
                .tag("push_type", push_req.push_type)
2024
0
                .error(st);
2025
0
    }
2026
2027
0
    finish_task_request.__set_task_status(st.to_thrift());
2028
0
    finish_task_request.__set_report_version(s_report_version);
2029
2030
0
    finish_task(finish_task_request);
2031
0
    remove_task_info(req.task_type, req.signature);
2032
0
}
2033
2034
PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine)
2035
0
        : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count,
2036
0
                         [this](const TAgentTaskRequest& task) { publish_version_callback(task); }),
2037
0
          _engine(engine) {}
2038
2039
0
PublishVersionWorkerPool::~PublishVersionWorkerPool() = default;
2040
2041
0
void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) {
2042
0
    const auto& publish_version_req = req.publish_version_req;
2043
0
    DorisMetrics::instance()->publish_task_request_total->increment(1);
2044
0
    VLOG_NOTICE << "get publish version task. signature=" << req.signature;
2045
2046
0
    std::set<TTabletId> error_tablet_ids;
2047
0
    std::map<TTabletId, TVersion> succ_tablets;
2048
    // partition_id, tablet_id, publish_version, commit_tso
2049
0
    std::vector<DiscontinuousVersionTablet> discontinuous_version_tablets;
2050
0
    std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
2051
0
    uint32_t retry_time = 0;
2052
0
    Status status;
2053
0
    constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
2054
0
    while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
2055
0
        succ_tablets.clear();
2056
0
        error_tablet_ids.clear();
2057
0
        table_id_to_tablet_id_to_num_delta_rows.clear();
2058
0
        EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids,
2059
0
                                             &succ_tablets, &discontinuous_version_tablets,
2060
0
                                             &table_id_to_tablet_id_to_num_delta_rows);
2061
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2062
0
        status = engine_task.execute();
2063
0
        if (status.ok()) {
2064
0
            break;
2065
0
        }
2066
2067
0
        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
2068
            // there are too many missing versions, it has been be added to async
2069
            // publish task, so no need to retry here.
2070
0
            if (discontinuous_version_tablets.empty()) {
2071
0
                break;
2072
0
            }
2073
0
            LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
2074
0
                                   << "transaction_id: " << publish_version_req.transaction_id;
2075
2076
0
            int64_t time_elapsed = time(nullptr) - req.recv_time;
2077
0
            if (time_elapsed > config::publish_version_task_timeout_s) {
2078
0
                LOG(INFO) << "task elapsed " << time_elapsed
2079
0
                          << " seconds since it is inserted to queue, it is timeout";
2080
0
                break;
2081
0
            }
2082
2083
            // Version not continuous, put to queue and wait pre version publish task execute
2084
0
            PUBLISH_VERSION_count << 1;
2085
0
            auto st = _thread_pool->submit_func([this, req] {
2086
0
                this->publish_version_callback(req);
2087
0
                PUBLISH_VERSION_count << -1;
2088
0
            });
2089
0
            if (!st.ok()) [[unlikely]] {
2090
0
                PUBLISH_VERSION_count << -1;
2091
0
                status = std::move(st);
2092
0
            } else {
2093
0
                return;
2094
0
            }
2095
0
        }
2096
2097
0
        LOG_WARNING("failed to publish version")
2098
0
                .tag("transaction_id", publish_version_req.transaction_id)
2099
0
                .tag("error_tablets_num", error_tablet_ids.size())
2100
0
                .tag("retry_time", retry_time)
2101
0
                .error(status);
2102
0
        ++retry_time;
2103
0
    }
2104
2105
0
    for (auto& item : discontinuous_version_tablets) {
2106
0
        _engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version,
2107
0
                                       publish_version_req.transaction_id, false, item.commit_tso);
2108
0
    }
2109
0
    TFinishTaskRequest finish_task_request;
2110
0
    if (!status.ok()) [[unlikely]] {
2111
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2112
        // if publish failed, return failed, FE will ignore this error and
2113
        // check error tablet ids and FE will also republish this task
2114
0
        LOG_WARNING("failed to publish version")
2115
0
                .tag("signature", req.signature)
2116
0
                .tag("transaction_id", publish_version_req.transaction_id)
2117
0
                .tag("error_tablets_num", error_tablet_ids.size())
2118
0
                .error(status);
2119
0
    } else {
2120
0
        if (!config::disable_auto_compaction &&
2121
0
            (!config::enable_compaction_pause_on_high_memory ||
2122
0
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
2123
0
            for (auto [tablet_id, _] : succ_tablets) {
2124
0
                TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
2125
0
                if (tablet != nullptr) {
2126
0
                    if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
2127
0
                        tablet->published_count.fetch_add(1);
2128
0
                        int64_t published_count = tablet->published_count.load();
2129
0
                        int32_t max_version_config = tablet->max_version_config();
2130
0
                        if (tablet->exceed_version_limit(
2131
0
                                    max_version_config *
2132
0
                                    config::load_trigger_compaction_version_percent / 100) &&
2133
0
                            published_count % 20 == 0) {
2134
0
                            auto st = _engine.submit_compaction_task(
2135
0
                                    tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
2136
0
                            if (!st.ok()) [[unlikely]] {
2137
0
                                LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
2138
0
                                             << ", published=" << published_count << " : " << st;
2139
0
                            } else {
2140
0
                                LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
2141
0
                                          << ", published:" << published_count;
2142
0
                            }
2143
0
                        }
2144
0
                    }
2145
0
                } else {
2146
0
                    LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
2147
0
                }
2148
0
            }
2149
0
        }
2150
0
        int64_t cost_second = time(nullptr) - req.recv_time;
2151
0
        g_publish_version_latency << cost_second;
2152
0
        LOG_INFO("successfully publish version")
2153
0
                .tag("signature", req.signature)
2154
0
                .tag("transaction_id", publish_version_req.transaction_id)
2155
0
                .tag("tablets_num", succ_tablets.size())
2156
0
                .tag("cost(s)", cost_second);
2157
0
    }
2158
2159
0
    status.to_thrift(&finish_task_request.task_status);
2160
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2161
0
    finish_task_request.__set_task_type(req.task_type);
2162
0
    finish_task_request.__set_signature(req.signature);
2163
0
    finish_task_request.__set_report_version(s_report_version);
2164
0
    finish_task_request.__set_succ_tablets(succ_tablets);
2165
0
    finish_task_request.__set_error_tablet_ids(
2166
0
            std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
2167
0
    finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
2168
0
            table_id_to_tablet_id_to_num_delta_rows);
2169
0
    finish_task(finish_task_request);
2170
0
    remove_task_info(req.task_type, req.signature);
2171
0
}
2172
2173
0
void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2174
0
    const auto& clear_transaction_task_req = req.clear_transaction_task_req;
2175
0
    LOG(INFO) << "get clear transaction task. signature=" << req.signature
2176
0
              << ", transaction_id=" << clear_transaction_task_req.transaction_id
2177
0
              << ", partition_id_size=" << clear_transaction_task_req.partition_id.size();
2178
2179
0
    Status status;
2180
2181
0
    if (clear_transaction_task_req.transaction_id > 0) {
2182
        // transaction_id should be greater than zero.
2183
        // If it is not greater than zero, no need to execute
2184
        // the following clear_transaction_task() function.
2185
0
        if (!clear_transaction_task_req.partition_id.empty()) {
2186
0
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id,
2187
0
                                          clear_transaction_task_req.partition_id);
2188
0
        } else {
2189
0
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id);
2190
0
        }
2191
0
        LOG(INFO) << "finish to clear transaction task. signature=" << req.signature
2192
0
                  << ", transaction_id=" << clear_transaction_task_req.transaction_id;
2193
0
    } else {
2194
0
        LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id
2195
0
                     << ". signature= " << req.signature;
2196
0
    }
2197
2198
0
    TFinishTaskRequest finish_task_request;
2199
0
    finish_task_request.__set_task_status(status.to_thrift());
2200
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2201
0
    finish_task_request.__set_task_type(req.task_type);
2202
0
    finish_task_request.__set_signature(req.signature);
2203
2204
0
    finish_task(finish_task_request);
2205
0
    remove_task_info(req.task_type, req.signature);
2206
0
}
2207
2208
0
void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2209
0
    int64_t signature = req.signature;
2210
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2211
0
    bool is_task_timeout = false;
2212
0
    if (req.__isset.recv_time) {
2213
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2214
0
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2215
0
            LOG(INFO) << "task elapsed " << time_elapsed
2216
0
                      << " seconds since it is inserted to queue, it is timeout";
2217
0
            is_task_timeout = true;
2218
0
        }
2219
0
    }
2220
0
    if (!is_task_timeout) {
2221
0
        TFinishTaskRequest finish_task_request;
2222
0
        TTaskType::type task_type = req.task_type;
2223
0
        alter_tablet(engine, req, signature, task_type, &finish_task_request);
2224
0
        finish_task(finish_task_request);
2225
0
    }
2226
0
    doris::g_fragment_executing_count << -1;
2227
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
2228
0
                          std::chrono::system_clock::now().time_since_epoch())
2229
0
                          .count();
2230
0
    g_fragment_last_active_time.set_value(now);
2231
0
    remove_task_info(req.task_type, req.signature);
2232
0
}
2233
2234
0
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2235
0
    int64_t signature = req.signature;
2236
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2237
0
    bool is_task_timeout = false;
2238
0
    if (req.__isset.recv_time) {
2239
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2240
0
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2241
0
            LOG(INFO) << "task elapsed " << time_elapsed
2242
0
                      << " seconds since it is inserted to queue, it is timeout";
2243
0
            is_task_timeout = true;
2244
0
        }
2245
0
    }
2246
0
    if (!is_task_timeout) {
2247
0
        TFinishTaskRequest finish_task_request;
2248
0
        TTaskType::type task_type = req.task_type;
2249
0
        alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request);
2250
0
        finish_task(finish_task_request);
2251
0
    }
2252
0
    doris::g_fragment_executing_count << -1;
2253
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
2254
0
                          std::chrono::system_clock::now().time_since_epoch())
2255
0
                          .count();
2256
0
    g_fragment_last_active_time.set_value(now);
2257
2258
    // Clean up alter_version before remove_task_info to avoid race:
2259
    // remove_task_info allows same-signature re-submit, whose pre_submit_callback
2260
    // would set alter_version, then this cleanup would wipe it.
2261
0
    if (req.__isset.alter_tablet_req_v2) {
2262
0
        const auto& alter_req = req.alter_tablet_req_v2;
2263
0
        auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2264
0
        auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2265
0
        if (new_tablet.has_value()) {
2266
0
            new_tablet.value()->set_alter_version(-1);
2267
0
        }
2268
0
        if (base_tablet.has_value()) {
2269
0
            base_tablet.value()->set_alter_version(-1);
2270
0
        }
2271
0
    }
2272
2273
0
    remove_task_info(req.task_type, req.signature);
2274
0
}
2275
2276
0
void set_alter_version_before_enqueue(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2277
0
    if (!req.__isset.alter_tablet_req_v2) {
2278
0
        return;
2279
0
    }
2280
0
    const auto& alter_req = req.alter_tablet_req_v2;
2281
0
    if (alter_req.alter_version <= 1) {
2282
0
        return;
2283
0
    }
2284
0
    auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2285
0
    if (!new_tablet.has_value() || new_tablet.value()->tablet_state() == TABLET_RUNNING) {
2286
0
        return;
2287
0
    }
2288
0
    auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2289
0
    if (!base_tablet.has_value()) {
2290
0
        return;
2291
0
    }
2292
0
    new_tablet.value()->set_alter_version(alter_req.alter_version);
2293
0
    base_tablet.value()->set_alter_version(alter_req.alter_version);
2294
0
    LOG(INFO) << "set alter_version=" << alter_req.alter_version
2295
0
              << " before enqueue, base_tablet=" << alter_req.base_tablet_id
2296
0
              << ", new_tablet=" << alter_req.new_tablet_id;
2297
0
}
2298
2299
0
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2300
0
    std::unordered_map<int64_t, int64_t> gc_tablet_infos;
2301
0
    if (!req.__isset.gc_binlog_req) {
2302
0
        LOG(WARNING) << "gc binlog task is not valid";
2303
0
        return;
2304
0
    }
2305
0
    if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
2306
0
        LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
2307
0
        return;
2308
0
    }
2309
2310
0
    const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos;
2311
0
    for (auto&& tablet_info : tablet_gc_binlog_infos) {
2312
        // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
2313
0
        gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
2314
0
    }
2315
2316
0
    engine.gc_binlogs(gc_tablet_infos);
2317
0
}
2318
2319
0
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2320
0
    const TVisibleVersionReq& visible_version_req = req.visible_version_req;
2321
0
    engine.tablet_manager()->update_partitions_visible_version(
2322
0
            visible_version_req.partition_version);
2323
0
}
2324
2325
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
2326
0
                    const TAgentTaskRequest& req) {
2327
0
    const auto& clone_req = req.clone_req;
2328
2329
0
    DorisMetrics::instance()->clone_requests_total->increment(1);
2330
0
    LOG(INFO) << "get clone task. signature=" << req.signature;
2331
2332
0
    std::vector<TTabletInfo> tablet_infos;
2333
0
    EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos);
2334
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2335
0
    auto status = engine_task.execute();
2336
    // Return result to fe
2337
0
    TFinishTaskRequest finish_task_request;
2338
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2339
0
    finish_task_request.__set_task_type(req.task_type);
2340
0
    finish_task_request.__set_signature(req.signature);
2341
0
    finish_task_request.__set_task_status(status.to_thrift());
2342
2343
0
    if (!status.ok()) {
2344
0
        DorisMetrics::instance()->clone_requests_failed->increment(1);
2345
0
        LOG_WARNING("failed to clone tablet")
2346
0
                .tag("signature", req.signature)
2347
0
                .tag("tablet_id", clone_req.tablet_id)
2348
0
                .error(status);
2349
0
    } else {
2350
0
        LOG_INFO("successfully clone tablet")
2351
0
                .tag("signature", req.signature)
2352
0
                .tag("tablet_id", clone_req.tablet_id)
2353
0
                .tag("copy_size", engine_task.get_copy_size())
2354
0
                .tag("copy_time_ms", engine_task.get_copy_time_ms());
2355
2356
0
        if (engine_task.is_new_tablet()) {
2357
0
            increase_report_version();
2358
0
            finish_task_request.__set_report_version(s_report_version);
2359
0
        }
2360
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
2361
0
        finish_task_request.__set_copy_size(engine_task.get_copy_size());
2362
0
        finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms());
2363
0
    }
2364
2365
0
    finish_task(finish_task_request);
2366
0
    remove_task_info(req.task_type, req.signature);
2367
0
}
2368
2369
0
void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2370
0
    const auto& storage_medium_migrate_req = req.storage_medium_migrate_req;
2371
2372
    // check request and get info
2373
0
    TabletSharedPtr tablet;
2374
0
    DataDir* dest_store = nullptr;
2375
2376
0
    auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store);
2377
0
    if (status.ok()) {
2378
0
        EngineStorageMigrationTask engine_task(engine, tablet, dest_store);
2379
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2380
0
        status = engine_task.execute();
2381
0
    }
2382
    // fe should ignore this err
2383
0
    if (status.is<FILE_ALREADY_EXIST>()) {
2384
0
        status = Status::OK();
2385
0
    }
2386
0
    if (!status.ok()) {
2387
0
        LOG_WARNING("failed to migrate storage medium")
2388
0
                .tag("signature", req.signature)
2389
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id)
2390
0
                .error(status);
2391
0
    } else {
2392
0
        LOG_INFO("successfully migrate storage medium")
2393
0
                .tag("signature", req.signature)
2394
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id);
2395
0
    }
2396
2397
0
    TFinishTaskRequest finish_task_request;
2398
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2399
0
    finish_task_request.__set_task_type(req.task_type);
2400
0
    finish_task_request.__set_signature(req.signature);
2401
0
    finish_task_request.__set_task_status(status.to_thrift());
2402
2403
0
    finish_task(finish_task_request);
2404
0
    remove_task_info(req.task_type, req.signature);
2405
0
}
2406
2407
0
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2408
0
    std::vector<TTabletId> error_tablet_ids;
2409
0
    std::vector<TTabletId> succ_tablet_ids;
2410
0
    Status status;
2411
0
    error_tablet_ids.clear();
2412
0
    const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
2413
0
    CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
2414
0
                                                &succ_tablet_ids);
2415
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2416
0
    if (req.signature != calc_delete_bitmap_req.transaction_id) {
2417
        // transaction_id may not be the same as req.signature, so add a log here
2418
0
        LOG_INFO("begin to execute calc delete bitmap task")
2419
0
                .tag("signature", req.signature)
2420
0
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id);
2421
0
    }
2422
0
    status = engine_task.execute();
2423
2424
0
    TFinishTaskRequest finish_task_request;
2425
0
    if (!status) {
2426
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2427
0
        LOG_WARNING("failed to calculate delete bitmap")
2428
0
                .tag("signature", req.signature)
2429
0
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
2430
0
                .tag("error_tablets_num", error_tablet_ids.size())
2431
0
                .error(status);
2432
0
    }
2433
2434
0
    status.to_thrift(&finish_task_request.task_status);
2435
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2436
0
    finish_task_request.__set_task_type(req.task_type);
2437
0
    finish_task_request.__set_signature(req.signature);
2438
0
    finish_task_request.__set_report_version(s_report_version);
2439
0
    finish_task_request.__set_error_tablet_ids(error_tablet_ids);
2440
0
    finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);
2441
2442
0
    finish_task(finish_task_request);
2443
0
    remove_task_info(req.task_type, req.signature);
2444
0
}
2445
2446
void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
2447
0
                                              const TAgentTaskRequest& req) {
2448
0
    if (!config::enable_cloud_make_rs_visible_on_be) {
2449
0
        return;
2450
0
    }
2451
0
    LOG(INFO) << "begin to make cloud tmp rs visible, txn_id="
2452
0
              << req.make_cloud_tmp_rs_visible_req.txn_id
2453
0
              << ", tablet_count=" << req.make_cloud_tmp_rs_visible_req.tablet_ids.size();
2454
2455
0
    const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req;
2456
0
    auto& tablet_mgr = engine.tablet_mgr();
2457
2458
0
    int64_t txn_id = make_visible_req.txn_id;
2459
0
    int64_t version_update_time_ms = make_visible_req.__isset.version_update_time_ms
2460
0
                                             ? make_visible_req.version_update_time_ms
2461
0
                                             : 0;
2462
2463
    // Process each tablet involved in this transaction on this BE
2464
0
    for (int64_t tablet_id : make_visible_req.tablet_ids) {
2465
0
        auto tablet_result =
2466
0
                tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false,
2467
0
                                      /* sync_delete_bitmap */ false,
2468
0
                                      /* sync_stats */ nullptr, /* force_use_only_cached */ true,
2469
0
                                      /* cache_on_miss */ false);
2470
0
        if (!tablet_result.has_value()) {
2471
0
            continue;
2472
0
        }
2473
0
        auto cloud_tablet = tablet_result.value();
2474
2475
0
        int64_t partition_id = cloud_tablet->partition_id();
2476
0
        auto version_iter = make_visible_req.partition_version_map.find(partition_id);
2477
0
        if (version_iter == make_visible_req.partition_version_map.end()) {
2478
0
            continue;
2479
0
        }
2480
0
        int64_t visible_version = version_iter->second;
2481
0
        DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", {
2482
0
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
2483
0
            auto target_table_id = dp->param<int64_t>("table_id", -1);
2484
0
            auto version = dp->param<int64_t>("version", -1);
2485
0
            if ((target_tablet_id == tablet_id || target_table_id == cloud_tablet->table_id()) &&
2486
0
                version == visible_version) {
2487
0
                DBUG_BLOCK
2488
0
            }
2489
0
        });
2490
0
        cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version,
2491
0
                                                    version_update_time_ms);
2492
0
    }
2493
0
    LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id
2494
0
              << ", processed_tablets=" << make_visible_req.tablet_ids.size();
2495
0
}
2496
2497
0
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2498
0
    LOG(INFO) << "clean trash start";
2499
0
    DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
2500
0
    static_cast<void>(engine.start_trash_sweep(nullptr, true));
2501
0
    static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
2502
0
    LOG(INFO) << "clean trash finish";
2503
0
}
2504
2505
0
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2506
0
    const auto& clean_req = req.clean_udf_cache_req;
2507
2508
0
    if (doris::config::enable_java_support) {
2509
0
        static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2510
0
    }
2511
2512
0
    if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2513
0
        UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
2514
0
    }
2515
2516
0
    LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
2517
0
}
2518
2519
0
void report_index_policy_callback(const ClusterInfo* cluster_info) {
2520
0
    TReportRequest request;
2521
0
    auto& index_policy_list = request.index_policy;
2522
0
    const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys();
2523
0
    for (const auto& policy : policys) {
2524
0
        index_policy_list.emplace_back(policy.second);
2525
0
    }
2526
0
    request.__isset.index_policy = true;
2527
0
    request.__set_backend(BackendOptions::get_local_backend());
2528
0
    bool succ = handle_report(request, cluster_info, "index_policy");
2529
0
    report_index_policy_total << 1;
2530
0
    if (!succ) [[unlikely]] {
2531
0
        report_index_policy_failed << 1;
2532
0
    }
2533
0
}
2534
2535
} // namespace doris