Coverage Report

Created: 2026-03-23 09:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/task_worker_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/task_worker_pool.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/AgentService_types.h>
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/MasterService_types.h>
26
#include <gen_cpp/Status_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <unistd.h>
29
30
#include <algorithm>
31
// IWYU pragma: no_include <bits/chrono.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
34
#include <atomic>
35
#include <chrono> // IWYU pragma: keep
36
#include <ctime>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <shared_mutex>
41
#include <sstream>
42
#include <string>
43
#include <thread>
44
#include <type_traits>
45
#include <utility>
46
#include <vector>
47
48
#include "agent/utils.h"
49
#include "cloud/cloud_delete_task.h"
50
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
51
#include "cloud/cloud_schema_change_job.h"
52
#include "cloud/cloud_snapshot_loader.h"
53
#include "cloud/cloud_snapshot_mgr.h"
54
#include "cloud/cloud_tablet.h"
55
#include "cloud/cloud_tablet_mgr.h"
56
#include "cloud/config.h"
57
#include "common/config.h"
58
#include "common/logging.h"
59
#include "common/metrics/doris_metrics.h"
60
#include "common/status.h"
61
#include "io/fs/file_system.h"
62
#include "io/fs/hdfs_file_system.h"
63
#include "io/fs/local_file_system.h"
64
#include "io/fs/obj_storage_client.h"
65
#include "io/fs/path.h"
66
#include "io/fs/remote_file_system.h"
67
#include "io/fs/s3_file_system.h"
68
#include "runtime/exec_env.h"
69
#include "runtime/fragment_mgr.h"
70
#include "runtime/index_policy/index_policy_mgr.h"
71
#include "runtime/memory/global_memory_arbitrator.h"
72
#include "runtime/snapshot_loader.h"
73
#include "runtime/user_function_cache.h"
74
#include "service/backend_options.h"
75
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
76
#include "storage/data_dir.h"
77
#include "storage/olap_common.h"
78
#include "storage/rowset/rowset_meta.h"
79
#include "storage/snapshot/snapshot_manager.h"
80
#include "storage/storage_engine.h"
81
#include "storage/storage_policy.h"
82
#include "storage/tablet/tablet.h"
83
#include "storage/tablet/tablet_manager.h"
84
#include "storage/tablet/tablet_meta.h"
85
#include "storage/tablet/tablet_schema.h"
86
#include "storage/task/engine_batch_load_task.h"
87
#include "storage/task/engine_checksum_task.h"
88
#include "storage/task/engine_clone_task.h"
89
#include "storage/task/engine_cloud_index_change_task.h"
90
#include "storage/task/engine_index_change_task.h"
91
#include "storage/task/engine_publish_version_task.h"
92
#include "storage/task/engine_storage_migration_task.h"
93
#include "storage/txn/txn_manager.h"
94
#include "storage/utils.h"
95
#include "util/brpc_client_cache.h"
96
#include "util/debug_points.h"
97
#include "util/jni-util.h"
98
#include "util/mem_info.h"
99
#include "util/random.h"
100
#include "util/s3_util.h"
101
#include "util/stopwatch.hpp"
102
#include "util/threadpool.h"
103
#include "util/time.h"
104
#include "util/trace.h"
105
106
namespace doris {
107
#include "common/compile_check_begin.h"
108
using namespace ErrorCode;
109
110
namespace {
111
112
std::mutex s_task_signatures_mtx;
113
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;
114
115
std::atomic_ulong s_report_version(time(nullptr) * 100000);
116
117
19.5k
void increase_report_version() {
118
19.5k
    s_report_version.fetch_add(1, std::memory_order_relaxed);
119
19.5k
}
120
121
// FIXME(plat1ko): Paired register and remove task info
122
66.0k
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
123
66.0k
    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
124
66.0k
        task_type == TTaskType::type::PUSH_COOLDOWN_CONF ||
125
66.0k
        task_type == TTaskType::type::COMPACTION) {
126
        // no need to report task of these types
127
12
        return true;
128
12
    }
129
130
66.0k
    if (signature == -1) { // No need to report task with unintialized signature
131
2.45k
        return true;
132
2.45k
    }
133
134
63.5k
    std::lock_guard lock(s_task_signatures_mtx);
135
63.5k
    auto& set = s_task_signatures[task_type];
136
63.5k
    return set.insert(signature).second;
137
66.0k
}
138
139
36.3k
void remove_task_info(const TTaskType::type task_type, int64_t signature) {
140
36.3k
    size_t queue_size;
141
36.3k
    {
142
36.3k
        std::lock_guard lock(s_task_signatures_mtx);
143
36.3k
        auto& set = s_task_signatures[task_type];
144
36.3k
        set.erase(signature);
145
36.3k
        queue_size = set.size();
146
36.3k
    }
147
148
18.4E
    VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
149
18.4E
                << ", queue_size=" << queue_size;
150
36.3k
}
151
152
35.8k
void finish_task(const TFinishTaskRequest& finish_task_request) {
153
    // Return result to FE
154
35.8k
    TMasterResult result;
155
35.8k
    uint32_t try_time = 0;
156
35.8k
    constexpr int TASK_FINISH_MAX_RETRY = 3;
157
36.1k
    while (try_time < TASK_FINISH_MAX_RETRY) {
158
36.1k
        DorisMetrics::instance()->finish_task_requests_total->increment(1);
159
36.1k
        Status client_status =
160
36.1k
                MasterServerClient::instance()->finish_task(finish_task_request, &result);
161
162
36.3k
        if (client_status.ok()) {
163
36.3k
            break;
164
18.4E
        } else {
165
18.4E
            DorisMetrics::instance()->finish_task_requests_failed->increment(1);
166
18.4E
            LOG_WARNING("failed to finish task")
167
18.4E
                    .tag("type", finish_task_request.task_type)
168
18.4E
                    .tag("signature", finish_task_request.signature)
169
18.4E
                    .error(result.status);
170
18.4E
            try_time += 1;
171
18.4E
        }
172
18.4E
        sleep(1);
173
18.4E
    }
174
35.8k
}
175
176
Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id,
177
0
                       const TSchemaHash schema_hash, TTabletInfo* tablet_info) {
178
0
    tablet_info->__set_tablet_id(tablet_id);
179
0
    tablet_info->__set_schema_hash(schema_hash);
180
0
    return engine.tablet_manager()->report_tablet_info(tablet_info);
181
0
}
182
183
1.37k
void random_sleep(int second) {
184
1.37k
    Random rnd(static_cast<uint32_t>(UnixMillis()));
185
1.37k
    sleep(rnd.Uniform(second) + 1);
186
1.37k
}
187
188
void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature,
189
0
                  const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) {
190
0
    Status status;
191
192
0
    std::string_view process_name = "alter tablet";
193
    // Check last schema change status, if failed delete tablet file
194
    // Do not need to adjust delete success or not
195
    // Because if delete failed create rollup will failed
196
0
    TTabletId new_tablet_id = 0;
197
0
    TSchemaHash new_schema_hash = 0;
198
0
    if (status.ok()) {
199
0
        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
200
0
        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
201
0
        auto mem_tracker = MemTrackerLimiter::create_shared(
202
0
                MemTrackerLimiter::Type::SCHEMA_CHANGE,
203
0
                fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
204
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
205
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
206
0
                            engine.memory_limitation_bytes_per_thread_for_schema_change()));
207
0
        SCOPED_ATTACH_TASK(mem_tracker);
208
0
        DorisMetrics::instance()->create_rollup_requests_total->increment(1);
209
0
        Status res = Status::OK();
210
0
        try {
211
0
            LOG_INFO("start {}", process_name)
212
0
                    .tag("signature", agent_task_req.signature)
213
0
                    .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
214
0
                    .tag("new_tablet_id", new_tablet_id)
215
0
                    .tag("mem_limit",
216
0
                         engine.memory_limitation_bytes_per_thread_for_schema_change());
217
0
            SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
218
0
                                std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id
219
0
                                                       ? agent_task_req.alter_tablet_req_v2.job_id
220
0
                                                       : 0));
221
0
            status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
222
0
        } catch (const Exception& e) {
223
0
            status = e.to_status();
224
0
        }
225
0
        if (!status.ok()) {
226
0
            DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
227
0
        }
228
0
    }
229
230
0
    if (status.ok()) {
231
0
        increase_report_version();
232
0
    }
233
234
    // Return result to fe
235
0
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
236
0
    finish_task_request->__set_report_version(s_report_version);
237
0
    finish_task_request->__set_task_type(task_type);
238
0
    finish_task_request->__set_signature(signature);
239
240
0
    std::vector<TTabletInfo> finish_tablet_infos;
241
0
    if (status.ok()) {
242
0
        TTabletInfo tablet_info;
243
0
        status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info);
244
0
        if (status.ok()) {
245
0
            finish_tablet_infos.push_back(tablet_info);
246
0
        }
247
0
    }
248
249
0
    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
250
0
        LOG_WARNING("failed to {}", process_name)
251
0
                .tag("signature", agent_task_req.signature)
252
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
253
0
                .tag("new_tablet_id", new_tablet_id)
254
0
                .error(status);
255
0
    } else {
256
0
        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
257
0
        LOG_INFO("successfully {}", process_name)
258
0
                .tag("signature", agent_task_req.signature)
259
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
260
0
                .tag("new_tablet_id", new_tablet_id);
261
0
    }
262
0
    finish_task_request->__set_task_status(status.to_thrift());
263
0
}
264
265
void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req,
266
                        int64_t signature, const TTaskType::type task_type,
267
10.5k
                        TFinishTaskRequest* finish_task_request) {
268
10.5k
    Status status;
269
270
10.5k
    std::string_view process_name = "alter tablet";
271
    // Check last schema change status, if failed delete tablet file
272
    // Do not need to adjust delete success or not
273
    // Because if delete failed create rollup will failed
274
10.5k
    TTabletId new_tablet_id = 0;
275
10.5k
    new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
276
10.5k
    auto mem_tracker = MemTrackerLimiter::create_shared(
277
10.5k
            MemTrackerLimiter::Type::SCHEMA_CHANGE,
278
10.5k
            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
279
10.5k
                        std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
280
10.5k
                        std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
281
10.5k
                        engine.memory_limitation_bytes_per_thread_for_schema_change()));
282
10.5k
    SCOPED_ATTACH_TASK(mem_tracker);
283
10.5k
    DorisMetrics::instance()->create_rollup_requests_total->increment(1);
284
285
10.5k
    LOG_INFO("start {}", process_name)
286
10.5k
            .tag("signature", agent_task_req.signature)
287
10.5k
            .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
288
10.5k
            .tag("new_tablet_id", new_tablet_id)
289
10.5k
            .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
290
10.5k
    DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
291
10.5k
    CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
292
10.5k
                             agent_task_req.alter_tablet_req_v2.expiration);
293
10.5k
    status = [&]() {
294
10.5k
        HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
295
10.5k
                job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
296
10.5k
                [&](const doris::Exception& ex) {
297
10.5k
                    DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
298
10.5k
                    job.clean_up_on_failure();
299
10.5k
                });
300
10.0k
        return Status::OK();
301
10.5k
    }();
302
303
10.5k
    if (status.ok()) {
304
9.90k
        increase_report_version();
305
9.90k
        LOG_INFO("successfully {}", process_name)
306
9.90k
                .tag("signature", agent_task_req.signature)
307
9.90k
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
308
9.90k
                .tag("new_tablet_id", new_tablet_id);
309
9.90k
    } else {
310
605
        LOG_WARNING("failed to {}", process_name)
311
605
                .tag("signature", agent_task_req.signature)
312
605
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
313
605
                .tag("new_tablet_id", new_tablet_id)
314
605
                .error(status);
315
605
    }
316
317
    // Return result to fe
318
10.5k
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
319
10.5k
    finish_task_request->__set_report_version(s_report_version);
320
10.5k
    finish_task_request->__set_task_type(task_type);
321
10.5k
    finish_task_request->__set_signature(signature);
322
10.5k
    finish_task_request->__set_task_status(status.to_thrift());
323
10.5k
}
324
325
Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req,
326
0
                             TabletSharedPtr& tablet, DataDir** dest_store) {
327
0
    int64_t tablet_id = req.tablet_id;
328
0
    tablet = engine.tablet_manager()->get_tablet(tablet_id);
329
0
    if (tablet == nullptr) {
330
0
        return Status::InternalError("could not find tablet {}", tablet_id);
331
0
    }
332
333
0
    if (req.__isset.data_dir) {
334
        // request specify the data dir
335
0
        *dest_store = engine.get_store(req.data_dir);
336
0
        if (*dest_store == nullptr) {
337
0
            return Status::InternalError("could not find data dir {}", req.data_dir);
338
0
        }
339
0
    } else {
340
        // this is a storage medium
341
        // get data dir by storage medium
342
343
        // judge case when no need to migrate
344
0
        uint32_t count = engine.available_storage_medium_type_count();
345
0
        if (count <= 1) {
346
0
            return Status::InternalError("available storage medium type count is less than 1");
347
0
        }
348
        // check current tablet storage medium
349
0
        TStorageMedium::type storage_medium = req.storage_medium;
350
0
        TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
351
0
        if (src_storage_medium == storage_medium) {
352
0
            return Status::InternalError("tablet is already on specified storage medium {}",
353
0
                                         storage_medium);
354
0
        }
355
        // get a random store of specified storage medium
356
0
        auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium);
357
0
        if (stores.empty()) {
358
0
            return Status::InternalError("failed to get root path for create tablet");
359
0
        }
360
361
0
        *dest_store = stores[0];
362
0
    }
363
0
    if (tablet->data_dir()->path() == (*dest_store)->path()) {
364
0
        LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
365
0
        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
366
0
                                                        tablet->data_dir()->path());
367
0
    }
368
369
    // check local disk capacity
370
0
    int64_t tablet_size = tablet->tablet_local_size();
371
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
372
0
        return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
373
0
                                             (*dest_store)->path(), tablet_size);
374
0
    }
375
0
    return Status::OK();
376
0
}
377
378
// Return `true` if report success
379
bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info,
380
3.02k
                   std::string_view name) {
381
3.02k
    TMasterResult result;
382
3.02k
    Status status = MasterServerClient::instance()->report(request, &result);
383
3.02k
    if (!status.ok()) [[unlikely]] {
384
0
        LOG_WARNING("failed to report {}", name)
385
0
                .tag("host", cluster_info->master_fe_addr.hostname)
386
0
                .tag("port", cluster_info->master_fe_addr.port)
387
0
                .error(status);
388
0
        return false;
389
0
    }
390
391
3.02k
    else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
392
0
        LOG_WARNING("failed to report {}", name)
393
0
                .tag("host", cluster_info->master_fe_addr.hostname)
394
0
                .tag("port", cluster_info->master_fe_addr.port)
395
0
                .error(result.status);
396
0
        return false;
397
0
    }
398
399
3.02k
    return true;
400
3.02k
}
401
402
Status _submit_task(const TAgentTaskRequest& task,
403
65.9k
                    std::function<Status(const TAgentTaskRequest&)> submit_op) {
404
65.9k
    const TTaskType::type task_type = task.task_type;
405
65.9k
    int64_t signature = task.signature;
406
407
65.9k
    std::string type_str;
408
65.9k
    EnumToString(TTaskType, task_type, type_str);
409
18.4E
    VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature;
410
411
65.9k
    if (!register_task_info(task_type, signature)) {
412
788
        LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
413
        // Duplicated task request, just return OK
414
788
        return Status::OK();
415
788
    }
416
417
    // TODO(plat1ko): check task request member
418
419
    // Set the receiving time of task so that we can determine whether it is timed out later
420
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
421
    // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok
422
65.1k
    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
423
65.1k
    auto st = submit_op(task);
424
65.1k
    if (!st.ok()) [[unlikely]] {
425
1
        LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature);
426
1
        return st;
427
1
    }
428
429
65.1k
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
430
65.1k
    return Status::OK();
431
65.1k
}
432
433
bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
434
435
bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX");
436
bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY");
437
bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD");
438
bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD");
439
bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT");
440
bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT");
441
bvar::Adder<uint64_t> MOVE_count("task", "MOVE");
442
bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION");
443
bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY");
444
bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY");
445
bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF");
446
bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE");
447
bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE");
448
bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION");
449
bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK");
450
bvar::Adder<uint64_t> DELETE_count("task", "DELETE");
451
bvar::Adder<uint64_t> PUSH_count("task", "PUSH");
452
bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO");
453
bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
454
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
455
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
456
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
457
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
458
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");
459
460
130k
void add_task_count(const TAgentTaskRequest& task, int n) {
461
    // clang-format off
462
130k
    switch (task.task_type) {
463
0
    #define ADD_TASK_COUNT(type) \
464
50.0k
    case TTaskType::type:        \
465
100k
        type##_count << n;       \
466
50.0k
        return;
467
1.14k
    ADD_TASK_COUNT(ALTER_INVERTED_INDEX)
468
0
    ADD_TASK_COUNT(CHECK_CONSISTENCY)
469
32
    ADD_TASK_COUNT(UPLOAD)
470
44
    ADD_TASK_COUNT(DOWNLOAD)
471
640
    ADD_TASK_COUNT(MAKE_SNAPSHOT)
472
640
    ADD_TASK_COUNT(RELEASE_SNAPSHOT)
473
344
    ADD_TASK_COUNT(MOVE)
474
0
    ADD_TASK_COUNT(COMPACTION)
475
20
    ADD_TASK_COUNT(PUSH_STORAGE_POLICY)
476
30
    ADD_TASK_COUNT(PUSH_INDEX_POLICY)
477
4
    ADD_TASK_COUNT(PUSH_COOLDOWN_CONF)
478
12.4k
    ADD_TASK_COUNT(CREATE)
479
6.53k
    ADD_TASK_COUNT(DROP)
480
4.99k
    ADD_TASK_COUNT(PUBLISH_VERSION)
481
28
    ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
482
0
    ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
483
0
    ADD_TASK_COUNT(CLONE)
484
0
    ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
485
0
    ADD_TASK_COUNT(GC_BINLOG)
486
4.85k
    ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
487
18.2k
    ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
488
0
    #undef ADD_TASK_COUNT
489
6.49k
    case TTaskType::REALTIME_PUSH:
490
6.49k
    case TTaskType::PUSH:
491
6.49k
        if (task.push_req.push_type == TPushType::LOAD_V2) {
492
0
            PUSH_count << n;
493
6.49k
        } else if (task.push_req.push_type == TPushType::DELETE) {
494
6.49k
            DELETE_count << n;
495
6.49k
        }
496
6.49k
        return;
497
21.0k
    case TTaskType::ALTER:
498
21.0k
    {
499
21.0k
        ALTER_count << n;
500
        // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment
501
21.0k
        if (n > 0) {
502
            // only count fragment when task is actually starting
503
10.5k
            doris::g_fragment_executing_count << 1;
504
10.5k
            int64_t now = duration_cast<std::chrono::milliseconds>(
505
10.5k
                                std::chrono::system_clock::now().time_since_epoch())
506
10.5k
                                .count();
507
10.5k
            g_fragment_last_active_time.set_value(now);
508
10.5k
        }
509
21.0k
        return;
510
6.49k
    }
511
52.9k
    default:
512
52.9k
        return;
513
130k
    }
514
    // clang-format on
515
130k
}
516
517
bvar::Adder<uint64_t> report_task_total("report", "task_total");
518
bvar::Adder<uint64_t> report_task_failed("report", "task_failed");
519
bvar::Adder<uint64_t> report_disk_total("report", "disk_total");
520
bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed");
521
bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total");
522
bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
523
bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total");
524
bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed");
525
526
} // namespace
527
528
TaskWorkerPool::TaskWorkerPool(
529
        std::string_view name, int worker_count,
530
149
        std::function<void(const TAgentTaskRequest& task)> callback,
531
149
        std::function<void(const TAgentTaskRequest& task)> pre_submit_callback)
532
149
        : _callback(std::move(callback)), _pre_submit_callback(std::move(pre_submit_callback)) {
533
149
    auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
534
149
                      .set_min_threads(worker_count)
535
149
                      .set_max_threads(worker_count)
536
149
                      .build(&_thread_pool);
537
    CHECK(st.ok()) << name << ": " << st;
538
70
}
539
70
540
70
TaskWorkerPool::~TaskWorkerPool() {
541
    stop();
542
74
}
543
74
544
4
void TaskWorkerPool::stop() {
545
4
    if (_stopped.exchange(true)) {
546
        return;
547
70
    }
548
70
549
70
    if (_thread_pool) {
550
70
        _thread_pool->shutdown();
551
    }
552
65.8k
}
553
65.8k
554
65.2k
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
555
65.2k
    return _submit_task(task, [this](auto&& task) {
556
65.2k
        if (_pre_submit_callback) {
557
65.2k
            _pre_submit_callback(task);
558
65.2k
        }
559
65.2k
        add_task_count(task, 1);
560
65.8k
        return _thread_pool->submit_func([this, task]() {
561
            _callback(task);
562
            add_task_count(task, -1);
563
        });
564
    });
565
13
}
566
50
567
37
PriorTaskWorkerPool::PriorTaskWorkerPool(
568
37
        const std::string& name, int normal_worker_count, int high_prior_worker_count,
569
37
        std::function<void(const TAgentTaskRequest& task)> callback)
570
37
        : _callback(std::move(callback)) {
571
    for (int i = 0; i < normal_worker_count; ++i) {
572
50
        auto st = Thread::create(
573
37
                "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
574
37
        CHECK(st.ok()) << name << ": " << st;
575
37
    }
576
37
577
13
    for (int i = 0; i < high_prior_worker_count; ++i) {
578
        auto st = Thread::create(
579
7
                "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
580
7
        CHECK(st.ok()) << name << ": " << st;
581
7
    }
582
}
583
11
584
11
PriorTaskWorkerPool::~PriorTaskWorkerPool() {
585
11
    stop();
586
11
}
587
4
588
4
void PriorTaskWorkerPool::stop() {
589
    {
590
7
        std::lock_guard lock(_mtx);
591
7
        if (_stopped) {
592
0
            return;
593
7
        }
594
595
38
        _stopped = true;
596
38
    }
597
38
    _normal_condv.notify_all();
598
38
    _high_prior_condv.notify_all();
599
38
600
7
    for (auto&& w : _workers) {
601
        if (w) {
602
6
            w->join();
603
6
        }
604
6
    }
605
6
}
606
6
607
4
Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
608
4
    return _submit_task(task, [this](auto&& task) {
609
4
        auto req = std::make_unique<TAgentTaskRequest>(task);
610
4
        add_task_count(*req, 1);
611
4
        if (req->__isset.priority && req->priority == TPriority::HIGH) {
612
2
            std::lock_guard lock(_mtx);
613
2
            _high_prior_queue.push_back(std::move(req));
614
2
            _high_prior_condv.notify_one();
615
2
            _normal_condv.notify_one();
616
6
        } else {
617
6
            std::lock_guard lock(_mtx);
618
6
            _normal_queue.push_back(std::move(req));
619
            _normal_condv.notify_one();
620
0
        }
621
0
        return Status::OK();
622
0
    });
623
0
}
624
0
625
0
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) {
626
    const TTaskType::type task_type = task.task_type;
627
0
    int64_t signature = task.signature;
628
0
    std::string type_str;
629
0
    EnumToString(TTaskType, task_type, type_str);
630
0
    auto req = std::make_unique<TAgentTaskRequest>(task);
631
0
632
    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
633
0
    do {
634
0
        std::lock_guard lock(s_task_signatures_mtx);
635
0
        auto& set = s_task_signatures[task_type];
636
0
        if (!set.contains(signature)) {
637
0
            // If it doesn't exist, put it directly into the priority queue
638
0
            add_task_count(*req, 1);
639
0
            set.insert(signature);
640
0
            std::lock_guard temp_lock(_mtx);
641
0
            _high_prior_queue.push_back(std::move(req));
642
0
            _high_prior_condv.notify_one();
643
            _normal_condv.notify_one();
644
0
            break;
645
0
        } else {
646
0
            std::lock_guard temp_lock(_mtx);
647
0
            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
648
0
                // If it exists in the normal queue, cancel the task in the normal queue
649
0
                if ((*it)->signature == signature) {
650
0
                    _normal_queue.erase(it);                     // cancel the original task
651
0
                    _high_prior_queue.push_back(std::move(req)); // add the new task to the queue
652
0
                    _high_prior_condv.notify_one();
653
0
                    _normal_condv.notify_one();
654
                    break;
655
0
                } else {
656
0
                    ++it; // doesn't meet the condition, continue to the next one
657
0
                }
658
            }
659
            // If it exists in the high priority queue, no operation is needed
660
0
            LOG_INFO("task has already existed in high prior queue.").tag("signature", signature);
661
        }
662
0
    } while (false);
663
0
664
0
    // Set the receiving time of task so that we can determine whether it is timed out later
665
    task.__set_recv_time(time(nullptr));
666
37
667
58
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
668
40
    return Status::OK();
669
}
670
40
671
40
void PriorTaskWorkerPool::normal_loop() {
672
61
    while (true) {
673
61
        std::unique_ptr<TAgentTaskRequest> req;
674
61
675
        {
676
40
            std::unique_lock lock(_mtx);
677
19
            _normal_condv.wait(lock, [&] {
678
19
                return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped;
679
            });
680
21
681
1
            if (_stopped) {
682
1
                return;
683
20
            }
684
2
685
2
            if (!_high_prior_queue.empty()) {
686
18
                req = std::move(_high_prior_queue.front());
687
18
                _high_prior_queue.pop_front();
688
18
            } else if (!_normal_queue.empty()) {
689
21
                req = std::move(_normal_queue.front());
690
                _normal_queue.pop_front();
691
3
            } else {
692
3
                continue;
693
3
            }
694
37
        }
695
696
37
        _callback(*req);
697
57
        add_task_count(*req, -1);
698
39
    }
699
}
700
39
701
39
void PriorTaskWorkerPool::high_prior_loop() {
702
60
    while (true) {
703
        std::unique_ptr<TAgentTaskRequest> req;
704
39
705
19
        {
706
19
            std::unique_lock lock(_mtx);
707
            _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; });
708
20
709
0
            if (_stopped) {
710
0
                return;
711
            }
712
20
713
20
            if (_high_prior_queue.empty()) {
714
20
                continue;
715
            }
716
0
717
20
            req = std::move(_high_prior_queue.front());
718
20
            _high_prior_queue.pop_front();
719
37
        }
720
721
        _callback(*req);
722
        add_task_count(*req, -1);
723
29
    }
724
29
}
725
29
726
29
ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s,
727
3.12k
                           std::function<void()> callback)
728
3.10k
        : _name(std::move(name)) {
729
3.10k
    auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] {
730
3.10k
        auto& engine = ExecEnv::GetInstance()->storage_engine();
731
6.19k
        engine.register_report_listener(this);
732
        while (true) {
733
3.10k
            {
734
13
                std::unique_lock lock(_mtx);
735
13
                _condv.wait_for(lock, std::chrono::seconds(report_interval_s),
736
                                [&] { return _stopped || _signal; });
737
3.09k
738
                if (_stopped) {
739
60
                    break;
740
60
                }
741
3.09k
742
                if (_signal) {
743
3.09k
                    // Consume received signal
744
                    _signal = false;
745
47
                }
746
47
            }
747
47
748
            if (cluster_info->master_fe_addr.port == 0) {
749
3.04k
                // port == 0 means not received heartbeat yet
750
3.04k
                LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report";
751
29
                continue;
752
29
            }
753
754
29
            callback();
755
29
        }
756
29
        engine.deregister_report_listener(this);
757
    };
758
13
759
13
    auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
760
13
    CHECK(st.ok()) << _name << ": " << st;
761
}
762
61
763
61
ReportWorker::~ReportWorker() {
764
61
    stop();
765
61
}
766
61
767
61
void ReportWorker::notify() {
768
61
    {
769
        std::lock_guard lock(_mtx);
770
14
        _signal = true;
771
14
    }
772
14
    _condv.notify_all();
773
14
}
774
1
775
1
void ReportWorker::stop() {
776
    {
777
13
        std::lock_guard lock(_mtx);
778
13
        if (_stopped) {
779
0
            return;
780
13
        }
781
13
782
13
        _stopped = true;
783
13
    }
784
    _condv.notify_all();
785
574
    if (_thread) {
786
574
        _thread->join();
787
574
    }
788
574
}
789
574
790
void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
791
574
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
792
574
    LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature
793
574
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
794
574
              << ", job_id=" << alter_inverted_index_rq.job_id;
795
574
796
574
    Status status = Status::OK();
797
0
    auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id);
798
0
    if (tablet_ptr != nullptr) {
799
        EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req);
800
        status = engine_task.execute();
801
574
    } else {
802
574
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
803
574
    }
804
574
805
574
    // Return result to fe
806
9
    TFinishTaskRequest finish_task_request;
807
9
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
808
9
    finish_task_request.__set_task_type(req.task_type);
809
565
    finish_task_request.__set_signature(req.signature);
810
565
    if (!status.ok()) {
811
565
        LOG(WARNING) << "[index_change]failed to alter inverted index task, signature="
812
565
                     << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
813
565
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
814
574
    } else {
815
574
        LOG(INFO) << "[index_change]successfully alter inverted index task, signature="
816
574
                  << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
817
574
                  << ", job_id=" << alter_inverted_index_rq.job_id;
818
    }
819
0
    finish_task_request.__set_task_status(status.to_thrift());
820
0
    finish_task(finish_task_request);
821
0
    remove_task_info(req.task_type, req.signature);
822
0
}
823
0
824
void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
825
0
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
826
0
    LOG(INFO) << "get alter inverted index task. signature=" << req.signature
827
0
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
828
0
              << ", job_id=" << alter_inverted_index_rq.job_id;
829
0
830
0
    Status status = Status::OK();
831
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
832
0
    if (tablet_ptr != nullptr) {
833
0
        EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
834
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
835
        status = engine_task.execute();
836
0
    } else {
837
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
838
0
    }
839
0
840
0
    // Return result to fe
841
0
    TFinishTaskRequest finish_task_request;
842
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
843
0
    finish_task_request.__set_task_type(req.task_type);
844
0
    finish_task_request.__set_signature(req.signature);
845
0
    std::vector<TTabletInfo> finish_tablet_infos;
846
0
    if (!status.ok()) {
847
0
        LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature
848
0
                     << ", tablet_id=" << alter_inverted_index_rq.tablet_id
849
0
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
850
0
    } else {
851
0
        LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature
852
0
                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
853
0
                  << ", job_id=" << alter_inverted_index_rq.job_id;
854
0
        TTabletInfo tablet_info;
855
0
        status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id,
856
0
                                 alter_inverted_index_rq.schema_hash, &tablet_info);
857
0
        if (status.ok()) {
858
0
            finish_tablet_infos.push_back(tablet_info);
859
0
        }
860
0
        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
861
    }
862
0
    finish_task_request.__set_task_status(status.to_thrift());
863
0
    finish_task(finish_task_request);
864
    remove_task_info(req.task_type, req.signature);
865
0
}
866
0
867
0
void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
868
0
    LOG(INFO) << "get update tablet meta task. signature=" << req.signature;
869
0
870
0
    Status status;
871
0
    const auto& update_tablet_meta_req = req.update_tablet_meta_info_req;
872
0
    for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
873
0
        auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id);
874
0
        if (tablet == nullptr) {
875
0
            status = Status::NotFound("tablet not found");
876
0
            LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id="
877
                         << tablet_meta_info.tablet_id;
878
0
            continue;
879
0
        }
880
0
        bool need_to_save = false;
881
0
        if (tablet_meta_info.__isset.partition_id) {
882
0
            // for fix partition_id = 0
883
0
            LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id()
884
0
                         << "partition id from : " << tablet->tablet_meta()->partition_id()
885
0
                         << " to : " << tablet_meta_info.partition_id;
886
0
            auto succ = engine.tablet_manager()->update_tablet_partition_id(
887
0
                    tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id());
888
0
            if (!succ) {
889
0
                std::string err_msg = fmt::format(
890
0
                        "change be tablet id : {} partition_id : {} failed",
891
0
                        tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id);
892
0
                LOG(WARNING) << err_msg;
893
0
                status = Status::InvalidArgument(err_msg);
894
0
                continue;
895
0
            }
896
0
            need_to_save = true;
897
0
        }
898
0
        if (tablet_meta_info.__isset.storage_policy_id) {
899
0
            tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
900
0
            need_to_save = true;
901
0
        }
902
0
        if (tablet_meta_info.__isset.is_in_memory) {
903
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
904
0
                    tablet_meta_info.is_in_memory);
905
0
            std::shared_lock rlock(tablet->get_header_lock());
906
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
907
0
                rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
908
0
            }
909
0
            tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
910
0
            need_to_save = true;
911
0
        }
912
0
        if (tablet_meta_info.__isset.compaction_policy) {
913
0
            if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY &&
914
0
                tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) {
915
0
                status = Status::InvalidArgument(
916
0
                        "invalid compaction policy, only support for size_based or "
917
0
                        "time_series");
918
0
                continue;
919
0
            }
920
0
            tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
921
0
            need_to_save = true;
922
0
        }
923
0
        if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
924
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
925
0
                status = Status::InvalidArgument(
926
0
                        "only time series compaction policy support time series config");
927
0
                continue;
928
0
            }
929
0
            tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
930
0
                    tablet_meta_info.time_series_compaction_goal_size_mbytes);
931
0
            need_to_save = true;
932
0
        }
933
0
        if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
934
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
935
0
                status = Status::InvalidArgument(
936
0
                        "only time series compaction policy support time series config");
937
0
                continue;
938
0
            }
939
0
            tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
940
0
                    tablet_meta_info.time_series_compaction_file_count_threshold);
941
0
            need_to_save = true;
942
0
        }
943
0
        if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
944
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
945
0
                status = Status::InvalidArgument(
946
0
                        "only time series compaction policy support time series config");
947
0
                continue;
948
0
            }
949
0
            tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
950
0
                    tablet_meta_info.time_series_compaction_time_threshold_seconds);
951
0
            need_to_save = true;
952
0
        }
953
0
        if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
954
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
955
0
                status = Status::InvalidArgument(
956
0
                        "only time series compaction policy support time series config");
957
0
                continue;
958
0
            }
959
0
            tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
960
0
                    tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
961
0
            need_to_save = true;
962
0
        }
963
0
        if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
964
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
965
0
                status = Status::InvalidArgument(
966
0
                        "only time series compaction policy support time series config");
967
0
                continue;
968
0
            }
969
0
            tablet->tablet_meta()->set_time_series_compaction_level_threshold(
970
0
                    tablet_meta_info.time_series_compaction_level_threshold);
971
0
            need_to_save = true;
972
0
        }
973
0
        if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) {
974
0
            tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group(
975
0
                    tablet_meta_info.vertical_compaction_num_columns_per_group);
976
0
            need_to_save = true;
977
        }
978
0
        if (tablet_meta_info.__isset.replica_id) {
979
0
            tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
980
0
        }
981
0
        if (tablet_meta_info.__isset.binlog_config) {
982
0
            // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums
983
0
            const auto& t_binlog_config = tablet_meta_info.binlog_config;
984
0
            if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds ||
985
0
                !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) {
986
0
                status = Status::InvalidArgument("invalid binlog config, some fields not set");
987
0
                LOG(WARNING) << fmt::format(
988
0
                        "invalid binlog config, some fields not set, tablet_id={}, "
989
                        "t_binlog_config={}",
990
0
                        tablet_meta_info.tablet_id,
991
0
                        apache::thrift::ThriftDebugString(t_binlog_config));
992
0
                continue;
993
0
            }
994
0
995
0
            BinlogConfig new_binlog_config;
996
0
            new_binlog_config = tablet_meta_info.binlog_config;
997
0
            LOG(INFO) << fmt::format(
998
0
                    "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, "
999
0
                    "new_binlog_config={}",
1000
0
                    tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(),
1001
0
                    new_binlog_config.to_string());
1002
0
            tablet->set_binlog_config(new_binlog_config);
1003
0
            need_to_save = true;
1004
0
        }
1005
0
        if (tablet_meta_info.__isset.enable_single_replica_compaction) {
1006
0
            std::shared_lock rlock(tablet->get_header_lock());
1007
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
1008
0
                    tablet_meta_info.enable_single_replica_compaction);
1009
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1010
0
                rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
1011
0
                        tablet_meta_info.enable_single_replica_compaction);
1012
0
            }
1013
0
            tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
1014
0
                    tablet_meta_info.enable_single_replica_compaction);
1015
0
            need_to_save = true;
1016
0
        }
1017
0
        if (tablet_meta_info.__isset.disable_auto_compaction) {
1018
0
            std::shared_lock rlock(tablet->get_header_lock());
1019
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
1020
0
                    tablet_meta_info.disable_auto_compaction);
1021
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1022
0
                rowset_meta->tablet_schema()->set_disable_auto_compaction(
1023
0
                        tablet_meta_info.disable_auto_compaction);
1024
            }
1025
0
            tablet->tablet_schema_unlocked()->set_disable_auto_compaction(
1026
0
                    tablet_meta_info.disable_auto_compaction);
1027
0
            need_to_save = true;
1028
0
        }
1029
0
1030
0
        if (tablet_meta_info.__isset.skip_write_index_on_load) {
1031
0
            std::shared_lock rlock(tablet->get_header_lock());
1032
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
1033
0
                    tablet_meta_info.skip_write_index_on_load);
1034
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1035
0
                rowset_meta->tablet_schema()->set_skip_write_index_on_load(
1036
0
                        tablet_meta_info.skip_write_index_on_load);
1037
0
            }
1038
0
            tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
1039
0
                    tablet_meta_info.skip_write_index_on_load);
1040
0
            need_to_save = true;
1041
0
        }
1042
        if (need_to_save) {
1043
0
            std::shared_lock rlock(tablet->get_header_lock());
1044
0
            tablet->save_meta();
1045
0
        }
1046
0
    }
1047
0
1048
0
    LOG(INFO) << "finish update tablet meta task. signature=" << req.signature;
1049
0
    if (req.signature != -1) {
1050
0
        TFinishTaskRequest finish_task_request;
1051
0
        finish_task_request.__set_task_status(status.to_thrift());
1052
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1053
0
        finish_task_request.__set_task_type(req.task_type);
1054
        finish_task_request.__set_signature(req.signature);
1055
0
        finish_task(finish_task_request);
1056
0
        remove_task_info(req.task_type, req.signature);
1057
0
    }
1058
0
}
1059
0
1060
0
void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1061
0
    uint32_t checksum = 0;
1062
0
    const auto& check_consistency_req = req.check_consistency_req;
1063
0
    EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
1064
0
                                   check_consistency_req.schema_hash, check_consistency_req.version,
1065
0
                                   &checksum);
1066
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1067
0
    Status status = engine_task.execute();
1068
0
    if (!status.ok()) {
1069
0
        LOG_WARNING("failed to check consistency")
1070
0
                .tag("signature", req.signature)
1071
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1072
0
                .error(status);
1073
0
    } else {
1074
        LOG_INFO("successfully check consistency")
1075
0
                .tag("signature", req.signature)
1076
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1077
0
                .tag("checksum", checksum);
1078
0
    }
1079
0
1080
0
    TFinishTaskRequest finish_task_request;
1081
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1082
    finish_task_request.__set_task_type(req.task_type);
1083
0
    finish_task_request.__set_signature(req.signature);
1084
0
    finish_task_request.__set_task_status(status.to_thrift());
1085
0
    finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
1086
    finish_task_request.__set_request_version(check_consistency_req.version);
1087
1.02k
1088
1.02k
    finish_task(finish_task_request);
1089
1.02k
    remove_task_info(req.task_type, req.signature);
1090
1.02k
}
1091
1.02k
1092
1.02k
void report_task_callback(const ClusterInfo* cluster_info) {
1093
1.02k
    TReportRequest request;
1094
1.02k
    if (config::report_random_wait) {
1095
1.02k
        random_sleep(5);
1096
5.51k
    }
1097
5.51k
    request.__isset.tasks = true;
1098
4.97M
    {
1099
4.97M
        std::lock_guard lock(s_task_signatures_mtx);
1100
4.97M
        auto& tasks = request.tasks;
1101
5.51k
        for (auto&& [task_type, signatures] : s_task_signatures) {
1102
1.02k
            auto& set = tasks[task_type];
1103
1.02k
            for (auto&& signature : signatures) {
1104
1.02k
                set.insert(signature);
1105
1.02k
            }
1106
1.02k
        }
1107
1.02k
    }
1108
0
    request.__set_backend(BackendOptions::get_local_backend());
1109
0
    request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
1110
1.02k
    bool succ = handle_report(request, cluster_info, "task");
1111
    report_task_total << 1;
1112
309
    if (!succ) [[unlikely]] {
1113
309
        report_task_failed << 1;
1114
309
    }
1115
309
}
1116
1117
309
void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1118
309
    TReportRequest request;
1119
    request.__set_backend(BackendOptions::get_local_backend());
1120
345
    request.__isset.disks = true;
1121
345
1122
345
    std::vector<DataDirInfo> data_dir_infos;
1123
345
    static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */));
1124
345
1125
345
    for (auto& root_path_info : data_dir_infos) {
1126
345
        TDisk disk;
1127
345
        disk.__set_root_path(root_path_info.path);
1128
345
        disk.__set_path_hash(root_path_info.path_hash);
1129
345
        disk.__set_storage_medium(root_path_info.storage_medium);
1130
345
        disk.__set_disk_total_capacity(root_path_info.disk_capacity);
1131
345
        disk.__set_data_used_capacity(root_path_info.local_used_capacity);
1132
345
        disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
1133
309
        disk.__set_disk_available_capacity(root_path_info.available);
1134
309
        disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
1135
309
        disk.__set_used(root_path_info.is_used);
1136
309
        request.disks[root_path_info.path] = disk;
1137
309
    }
1138
309
    request.__set_num_cores(CpuInfo::num_cores());
1139
309
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1140
0
                                                 ? config::pipeline_executor_size
1141
0
                                                 : CpuInfo::num_cores());
1142
309
    bool succ = handle_report(request, cluster_info, "disk");
1143
    report_disk_total << 1;
1144
129
    if (!succ) [[unlikely]] {
1145
        report_disk_failed << 1;
1146
    }
1147
}
1148
129
1149
129
void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1150
129
    // Random sleep 1~5 seconds before doing report.
1151
129
    // In order to avoid the problem that the FE receives many report requests at the same time
1152
    // and can not be processed.
1153
129
    if (config::report_random_wait) {
1154
129
        random_sleep(5);
1155
129
    }
1156
    (void)engine; // To be used in the future
1157
1158
    TReportRequest request;
1159
    request.__set_backend(BackendOptions::get_local_backend());
1160
129
    request.__isset.disks = true;
1161
129
1162
129
    // TODO(deardeng): report disk info in cloud mode. And make it more clear
1163
129
    //                 that report CPU by using a separte report procedure
1164
129
    //                 or abstracting disk report as "host info report"
1165
129
    request.__set_num_cores(CpuInfo::num_cores());
1166
129
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1167
129
                                                 ? config::pipeline_executor_size
1168
                                                 : CpuInfo::num_cores());
1169
149
    bool succ = handle_report(request, cluster_info, "disk");
1170
149
    report_disk_total << 1;
1171
149
    report_disk_failed << !succ;
1172
149
}
1173
1174
149
void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1175
149
    if (config::report_random_wait) {
1176
149
        random_sleep(5);
1177
    }
1178
149
1179
149
    TReportRequest request;
1180
149
    request.__set_backend(BackendOptions::get_local_backend());
1181
149
    request.__isset.tablets = true;
1182
149
1183
149
    increase_report_version();
1184
149
    uint64_t report_version;
1185
149
    for (int i = 0; i < 5; i++) {
1186
149
        request.tablets.clear();
1187
149
        report_version = s_report_version;
1188
        engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
1189
149
        if (report_version == s_report_version) {
1190
            break;
1191
        }
1192
    }
1193
1194
0
    if (report_version < s_report_version) {
1195
0
        // TODO llj This can only reduce the possibility for report error, but can't avoid it.
1196
0
        // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
1197
0
        // report, and the report version has a small probability that it has not been updated in time. When FE
1198
        // receives this report, it is possible to delete the new tablet.
1199
149
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1200
149
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1201
149
        return;
1202
    }
1203
149
1204
149
    std::map<int64_t, int64_t> partitions_version;
1205
149
    engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
1206
149
    request.__set_partitions_version(std::move(partitions_version));
1207
149
1208
    int64_t max_compaction_score =
1209
            std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
1210
149
                     DorisMetrics::instance()->tablet_base_max_compaction_score->value());
1211
149
    request.__set_tablet_max_compaction_score(max_compaction_score);
1212
72
    request.__set_report_version(report_version);
1213
72
1214
72
    // report storage policy and resource
1215
72
    auto& storage_policy_list = request.storage_policy;
1216
149
    for (auto [id, version] : get_storage_policy_ids()) {
1217
149
        auto& storage_policy = storage_policy_list.emplace_back();
1218
188
        storage_policy.__set_id(id);
1219
188
        storage_policy.__set_version(version);
1220
188
    }
1221
188
    request.__isset.storage_policy = true;
1222
188
    auto& resource_list = request.resource;
1223
0
    for (auto [id_str, version] : get_storage_resource_ids()) {
1224
188
        auto& resource = resource_list.emplace_back();
1225
188
        int64_t id = -1;
1226
188
        if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
1227
188
            ec != std::errc {}) [[unlikely]] {
1228
188
            LOG(ERROR) << "invalid resource id format: " << id_str;
1229
149
        } else {
1230
            resource.__set_id(id);
1231
149
            resource.__set_version(version);
1232
149
        }
1233
149
    }
1234
0
    request.__isset.resource = true;
1235
0
1236
149
    bool succ = handle_report(request, cluster_info, "tablet");
1237
    report_tablet_total << 1;
1238
68
    if (!succ) [[unlikely]] {
1239
        report_tablet_failed << 1;
1240
    }
1241
}
1242
68
1243
68
void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1244
68
    // Random sleep 1~5 seconds before doing report.
1245
    // In order to avoid the problem that the FE receives many report requests at the same time
1246
68
    // and can not be processed.
1247
68
    if (config::report_random_wait) {
1248
68
        random_sleep(5);
1249
    }
1250
68
1251
68
    TReportRequest request;
1252
68
    request.__set_backend(BackendOptions::get_local_backend());
1253
89
    request.__isset.tablets = true;
1254
86
1255
86
    increase_report_version();
1256
86
    uint64_t report_version;
1257
86
    uint64_t total_num_tablets = 0;
1258
65
    for (int i = 0; i < 5; i++) {
1259
65
        request.tablets.clear();
1260
86
        report_version = s_report_version;
1261
        engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
1262
68
        if (report_version == s_report_version) {
1263
3
            break;
1264
3
        }
1265
3
    }
1266
3
1267
    if (report_version < s_report_version) {
1268
65
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1269
65
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1270
        return;
1271
65
    }
1272
65
1273
65
    request.__set_report_version(report_version);
1274
0
    request.__set_num_tablets(total_num_tablets);
1275
0
1276
65
    bool succ = handle_report(request, cluster_info, "tablet");
1277
    report_tablet_total << 1;
1278
16
    if (!succ) [[unlikely]] {
1279
16
        report_tablet_failed << 1;
1280
    }
1281
16
}
1282
16
1283
void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1284
16
    const auto& upload_request = req.upload_req;
1285
16
1286
16
    LOG(INFO) << "get upload task. signature=" << req.signature
1287
16
              << ", job_id=" << upload_request.job_id;
1288
16
1289
16
    std::map<int64_t, std::vector<std::string>> tablet_files;
1290
16
    std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1291
16
            engine, env, upload_request.job_id, req.signature, upload_request.broker_addr,
1292
16
            upload_request.broker_prop);
1293
16
    SCOPED_ATTACH_TASK(loader->resource_ctx());
1294
16
    Status status =
1295
16
            loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend
1296
                                                                : TStorageBackendType::type::BROKER,
1297
16
                         upload_request.__isset.location ? upload_request.location : "");
1298
0
    if (status.ok()) {
1299
0
        status = loader->upload(upload_request.src_dest_map, &tablet_files);
1300
0
    }
1301
0
1302
16
    if (!status.ok()) {
1303
16
        LOG_WARNING("failed to upload")
1304
16
                .tag("signature", req.signature)
1305
16
                .tag("job_id", upload_request.job_id)
1306
16
                .error(status);
1307
    } else {
1308
16
        LOG_INFO("successfully upload")
1309
16
                .tag("signature", req.signature)
1310
16
                .tag("job_id", upload_request.job_id);
1311
16
    }
1312
16
1313
16
    TFinishTaskRequest finish_task_request;
1314
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1315
16
    finish_task_request.__set_task_type(req.task_type);
1316
16
    finish_task_request.__set_signature(req.signature);
1317
16
    finish_task_request.__set_task_status(status.to_thrift());
1318
    finish_task_request.__set_tablet_files(tablet_files);
1319
22
1320
22
    finish_task(finish_task_request);
1321
22
    remove_task_info(req.task_type, req.signature);
1322
22
}
1323
22
1324
void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1325
    const auto& download_request = req.download_req;
1326
22
    LOG(INFO) << "get download task. signature=" << req.signature
1327
              << ", job_id=" << download_request.job_id
1328
22
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1329
22
1330
0
    // TODO: download
1331
0
    std::vector<int64_t> downloaded_tablet_ids;
1332
0
1333
0
    auto status = Status::OK();
1334
22
    if (download_request.__isset.remote_tablet_snapshots) {
1335
22
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1336
22
                engine, env, download_request.job_id, req.signature);
1337
22
        status = loader->remote_http_download(download_request.remote_tablet_snapshots,
1338
22
                                              &downloaded_tablet_ids);
1339
22
    } else {
1340
22
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1341
22
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1342
22
                download_request.broker_prop);
1343
22
        status = loader->init(download_request.__isset.storage_backend
1344
22
                                      ? download_request.storage_backend
1345
22
                                      : TStorageBackendType::type::BROKER,
1346
                              download_request.__isset.location ? download_request.location : "");
1347
22
        if (status.ok()) {
1348
0
            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
1349
0
        }
1350
0
    }
1351
0
1352
22
    if (!status.ok()) {
1353
22
        LOG_WARNING("failed to download")
1354
22
                .tag("signature", req.signature)
1355
22
                .tag("job_id", download_request.job_id)
1356
22
                .error(status);
1357
    } else {
1358
22
        LOG_INFO("successfully download")
1359
22
                .tag("signature", req.signature)
1360
22
                .tag("job_id", download_request.job_id);
1361
22
    }
1362
22
1363
22
    TFinishTaskRequest finish_task_request;
1364
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1365
22
    finish_task_request.__set_task_type(req.task_type);
1366
22
    finish_task_request.__set_signature(req.signature);
1367
22
    finish_task_request.__set_task_status(status.to_thrift());
1368
    finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
1369
0
1370
0
    finish_task(finish_task_request);
1371
0
    remove_task_info(req.task_type, req.signature);
1372
0
}
1373
0
1374
void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1375
0
    const auto& download_request = req.download_req;
1376
    LOG(INFO) << "get download task. signature=" << req.signature
1377
0
              << ", job_id=" << download_request.job_id
1378
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1379
0
1380
0
    std::vector<int64_t> transferred_tablet_ids;
1381
0
1382
0
    auto status = Status::OK();
1383
0
    if (download_request.__isset.remote_tablet_snapshots) {
1384
0
        status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
1385
0
                "remote tablet snapshot is not supported.");
1386
0
    } else {
1387
0
        std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>(
1388
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1389
0
                download_request.broker_prop);
1390
0
        status = loader->init(download_request.__isset.storage_backend
1391
0
                                      ? download_request.storage_backend
1392
0
                                      : TStorageBackendType::type::BROKER,
1393
                              download_request.__isset.location ? download_request.location : "",
1394
0
                              download_request.vault_id);
1395
0
        if (status.ok()) {
1396
0
            status = loader->download(download_request.src_dest_map, &transferred_tablet_ids);
1397
0
        }
1398
0
1399
0
        if (!status.ok()) {
1400
0
            LOG_WARNING("failed to download")
1401
0
                    .tag("signature", req.signature)
1402
0
                    .tag("job_id", download_request.job_id)
1403
0
                    .error(status);
1404
        } else {
1405
0
            LOG_INFO("successfully download")
1406
0
                    .tag("signature", req.signature)
1407
0
                    .tag("job_id", download_request.job_id);
1408
0
        }
1409
0
1410
0
        TFinishTaskRequest finish_task_request;
1411
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1412
0
        finish_task_request.__set_task_type(req.task_type);
1413
0
        finish_task_request.__set_signature(req.signature);
1414
0
        finish_task_request.__set_task_status(status.to_thrift());
1415
0
        finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids);
1416
1417
320
        finish_task(finish_task_request);
1418
320
        remove_task_info(req.task_type, req.signature);
1419
    }
1420
320
}
1421
1422
320
void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1423
320
    const auto& snapshot_request = req.snapshot_req;
1424
320
1425
320
    LOG(INFO) << "get snapshot task. signature=" << req.signature;
1426
320
1427
320
    std::string snapshot_path;
1428
    bool allow_incremental_clone = false; // not used
1429
    std::vector<std::string> snapshot_files;
1430
    Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
1431
320
                                                         &allow_incremental_clone);
1432
320
    if (status.ok() && snapshot_request.__isset.list_files) {
1433
320
        // list and save all snapshot files
1434
320
        // snapshot_path like: data/snapshot/20180417205230.1.86400
1435
320
        // we need to add subdir: tablet_id/schema_hash/
1436
320
        std::vector<io::FileInfo> files;
1437
342
        bool exists = true;
1438
342
        io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id,
1439
342
                                    snapshot_request.schema_hash);
1440
320
        status = io::global_local_filesystem()->list(path, true, &files, &exists);
1441
320
        if (status.ok()) {
1442
320
            for (auto& file : files) {
1443
0
                snapshot_files.push_back(file.file_name);
1444
0
            }
1445
0
        }
1446
0
    }
1447
0
    if (!status.ok()) {
1448
320
        LOG_WARNING("failed to make snapshot")
1449
320
                .tag("signature", req.signature)
1450
320
                .tag("tablet_id", snapshot_request.tablet_id)
1451
320
                .tag("version", snapshot_request.version)
1452
320
                .error(status);
1453
320
    } else {
1454
320
        LOG_INFO("successfully make snapshot")
1455
                .tag("signature", req.signature)
1456
320
                .tag("tablet_id", snapshot_request.tablet_id)
1457
320
                .tag("version", snapshot_request.version)
1458
320
                .tag("snapshot_path", snapshot_path);
1459
320
    }
1460
320
1461
320
    TFinishTaskRequest finish_task_request;
1462
320
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1463
    finish_task_request.__set_task_type(req.task_type);
1464
320
    finish_task_request.__set_signature(req.signature);
1465
320
    finish_task_request.__set_snapshot_path(snapshot_path);
1466
320
    finish_task_request.__set_snapshot_files(snapshot_files);
1467
    finish_task_request.__set_task_status(status.to_thrift());
1468
320
1469
320
    finish_task(finish_task_request);
1470
    remove_task_info(req.task_type, req.signature);
1471
320
}
1472
1473
320
void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1474
320
    const auto& release_snapshot_request = req.release_snapshot_req;
1475
320
1476
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1477
0
1478
0
    const std::string& snapshot_path = release_snapshot_request.snapshot_path;
1479
0
    Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
1480
320
    if (!status.ok()) {
1481
320
        LOG_WARNING("failed to release snapshot")
1482
320
                .tag("signature", req.signature)
1483
320
                .tag("snapshot_path", snapshot_path)
1484
320
                .error(status);
1485
    } else {
1486
320
        LOG_INFO("successfully release snapshot")
1487
320
                .tag("signature", req.signature)
1488
320
                .tag("snapshot_path", snapshot_path);
1489
320
    }
1490
320
1491
    TFinishTaskRequest finish_task_request;
1492
320
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1493
320
    finish_task_request.__set_task_type(req.task_type);
1494
320
    finish_task_request.__set_signature(req.signature);
1495
    finish_task_request.__set_task_status(status.to_thrift());
1496
0
1497
0
    finish_task(finish_task_request);
1498
    remove_task_info(req.task_type, req.signature);
1499
0
}
1500
1501
0
void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1502
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1503
1504
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1505
0
1506
0
    Status status = engine.cloud_snapshot_mgr().release_snapshot(
1507
0
            release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed);
1508
0
1509
0
    if (!status.ok()) {
1510
0
        LOG_WARNING("failed to release snapshot")
1511
0
                .tag("signature", req.signature)
1512
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1513
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed)
1514
0
                .error(status);
1515
0
    } else {
1516
        LOG_INFO("successfully release snapshot")
1517
0
                .tag("signature", req.signature)
1518
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1519
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed);
1520
0
    }
1521
0
1522
    TFinishTaskRequest finish_task_request;
1523
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1524
0
    finish_task_request.__set_task_type(req.task_type);
1525
0
    finish_task_request.__set_signature(req.signature);
1526
    finish_task_request.__set_task_status(status.to_thrift());
1527
172
1528
172
    finish_task(finish_task_request);
1529
    remove_task_info(req.task_type, req.signature);
1530
172
}
1531
172
1532
172
void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1533
172
    const auto& move_dir_req = req.move_dir_req;
1534
172
1535
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1536
172
              << ", job_id=" << move_dir_req.job_id;
1537
172
    Status status;
1538
172
    auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id);
1539
172
    if (tablet == nullptr) {
1540
        status = Status::InvalidArgument("Could not find tablet");
1541
172
    } else {
1542
0
        SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id);
1543
0
        status = loader.move(move_dir_req.src, tablet, true);
1544
0
    }
1545
0
1546
0
    if (!status.ok()) {
1547
0
        LOG_WARNING("failed to move dir")
1548
172
                .tag("signature", req.signature)
1549
172
                .tag("job_id", move_dir_req.job_id)
1550
172
                .tag("tablet_id", move_dir_req.tablet_id)
1551
172
                .tag("src", move_dir_req.src)
1552
172
                .error(status);
1553
172
    } else {
1554
172
        LOG_INFO("successfully move dir")
1555
                .tag("signature", req.signature)
1556
172
                .tag("job_id", move_dir_req.job_id)
1557
172
                .tag("tablet_id", move_dir_req.tablet_id)
1558
172
                .tag("src", move_dir_req.src);
1559
172
    }
1560
172
1561
    TFinishTaskRequest finish_task_request;
1562
172
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1563
172
    finish_task_request.__set_task_type(req.task_type);
1564
172
    finish_task_request.__set_signature(req.signature);
1565
    finish_task_request.__set_task_status(status.to_thrift());
1566
0
1567
0
    finish_task(finish_task_request);
1568
    remove_task_info(req.task_type, req.signature);
1569
0
}
1570
0
1571
void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1572
0
    const auto& move_dir_req = req.move_dir_req;
1573
0
1574
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1575
0
              << ", job_id=" << move_dir_req.job_id;
1576
0
1577
0
    Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id);
1578
0
    if (!status.ok()) {
1579
0
        LOG_WARNING("failed to move dir")
1580
0
                .tag("signature", req.signature)
1581
0
                .tag("job_id", move_dir_req.job_id)
1582
0
                .tag("tablet_id", move_dir_req.tablet_id)
1583
0
                .error(status);
1584
0
    } else {
1585
        LOG_INFO("successfully move dir")
1586
0
                .tag("signature", req.signature)
1587
0
                .tag("job_id", move_dir_req.job_id)
1588
0
                .tag("tablet_id", move_dir_req.tablet_id);
1589
0
    }
1590
0
1591
    TFinishTaskRequest finish_task_request;
1592
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1593
0
    finish_task_request.__set_task_type(req.task_type);
1594
0
    finish_task_request.__set_signature(req.signature);
1595
    finish_task_request.__set_task_status(status.to_thrift());
1596
0
1597
0
    finish_task(finish_task_request);
1598
    remove_task_info(req.task_type, req.signature);
1599
0
}
1600
0
1601
void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1602
0
    const auto& compaction_req = req.compaction_req;
1603
0
1604
0
    LOG(INFO) << "get compaction task. signature=" << req.signature
1605
0
              << ", compaction_type=" << compaction_req.type;
1606
0
1607
0
    CompactionType compaction_type;
1608
    if (compaction_req.type == "base") {
1609
0
        compaction_type = CompactionType::BASE_COMPACTION;
1610
0
    } else {
1611
0
        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
1612
0
    }
1613
0
1614
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id);
1615
0
    if (tablet_ptr != nullptr) {
1616
0
        auto* data_dir = tablet_ptr->data_dir();
1617
        if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
1618
0
            LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id()
1619
0
                         << ", compaction_type=" << compaction_type;
1620
0
            return;
1621
0
        }
1622
0
1623
0
        Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false);
1624
        if (!status.ok()) {
1625
            LOG(WARNING) << "failed to submit table compaction task. error=" << status;
1626
        }
1627
34
    }
1628
34
}
1629
34
1630
namespace {
1631
34
1632
void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1633
34
    Status st;
1634
34
    io::RemoteFileSystemSPtr fs;
1635
34
1636
8
    if (!existed_fs) {
1637
26
        // No such FS instance on BE
1638
26
        auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
1639
26
                                            std::to_string(param.id));
1640
34
        if (!res.has_value()) {
1641
0
            st = std::move(res).error();
1642
0
        } else {
1643
0
            fs = std::move(res).value();
1644
0
        }
1645
0
    } else {
1646
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
1647
0
        auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
1648
        auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
1649
34
        S3ClientConf conf = std::move(new_s3_conf.client_conf);
1650
8
        st = client->reset(conf);
1651
26
        fs = std::move(existed_fs);
1652
26
    }
1653
26
1654
26
    if (!st.ok()) {
1655
26
        LOG(WARNING) << "update s3 resource failed: " << st;
1656
26
    } else {
1657
34
        LOG_INFO("successfully update s3 resource")
1658
                .tag("resource_id", param.id)
1659
38
                .tag("resource_name", param.name);
1660
38
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1661
38
    }
1662
38
}
1663
38
1664
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1665
38
    Status st;
1666
    io::RemoteFileSystemSPtr fs;
1667
38
    std::string root_path =
1668
38
            param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";
1669
38
1670
38
    if (!existed_fs) {
1671
16
        // No such FS instance on BE
1672
22
        auto res = io::HdfsFileSystem::create(
1673
22
                param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
1674
22
                std::to_string(param.id), nullptr, std::move(root_path));
1675
        if (!res.has_value()) {
1676
38
            st = std::move(res).error();
1677
0
        } else {
1678
            fs = std::move(res).value();
1679
0
        }
1680
0
1681
    } else {
1682
38
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
1683
16
        // TODO(plat1ko): update hdfs conf
1684
22
        fs = std::move(existed_fs);
1685
22
    }
1686
22
1687
22
    if (!st.ok()) {
1688
22
        LOG(WARNING) << "update hdfs resource failed: " << st;
1689
22
    } else {
1690
22
        LOG_INFO("successfully update hdfs resource")
1691
38
                .tag("resource_id", param.id)
1692
                .tag("resource_name", param.name)
1693
                .tag("root_path", fs->root_path().string());
1694
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1695
10
    }
1696
10
}
1697
1698
72
} // namespace
1699
72
1700
72
void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1701
0
    const auto& push_storage_policy_req = req.push_storage_policy_req;
1702
    // refresh resource
1703
0
    for (auto&& param : push_storage_policy_req.resource) {
1704
0
        io::RemoteFileSystemSPtr fs;
1705
        if (auto existed_resource = get_storage_resource(param.id); existed_resource) {
1706
0
            if (existed_resource->second >= param.version) {
1707
0
                // Stale request, ignore
1708
                continue;
1709
72
            }
1710
34
1711
38
            fs = std::move(existed_resource->first.fs);
1712
38
        }
1713
38
1714
0
        if (param.__isset.s3_storage_param) {
1715
0
            update_s3_resource(param, std::move(fs));
1716
72
        } else if (param.__isset.hdfs_storage_param) {
1717
            update_hdfs_resource(param, std::move(fs));
1718
10
        } else {
1719
0
            LOG(WARNING) << "unknown resource=" << param;
1720
0
        }
1721
    }
1722
24
    // drop storage policy
1723
24
    for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
1724
24
        delete_storage_policy(policy_id);
1725
24
    }
1726
24
    // refresh storage policy
1727
24
    for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
1728
24
        auto existed_storage_policy = get_storage_policy(storage_policy.id);
1729
24
        if (existed_storage_policy == nullptr ||
1730
24
            existed_storage_policy->version < storage_policy.version) {
1731
24
            auto storage_policy1 = std::make_shared<StoragePolicy>();
1732
24
            storage_policy1->name = storage_policy.name;
1733
24
            storage_policy1->version = storage_policy.version;
1734
24
            storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
1735
24
            storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
1736
24
            storage_policy1->resource_id = storage_policy.resource_id;
1737
24
            LOG_INFO("successfully update storage policy")
1738
10
                    .tag("storage_policy_id", storage_policy.id)
1739
                    .tag("storage_policy", storage_policy1->to_string());
1740
15
            put_storage_policy(storage_policy.id, std::move(storage_policy1));
1741
15
        }
1742
15
    }
1743
15
}
1744
15
1745
void push_index_policy_callback(const TAgentTaskRequest& req) {
1746
2
    const auto& request = req.push_index_policy_req;
1747
2
    doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes(
1748
326
            request.index_policys, request.dropped_index_policys);
1749
326
}
1750
326
1751
326
void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1752
0
    const auto& push_cooldown_conf_req = req.push_cooldown_conf;
1753
0
    for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
1754
0
        int64_t tablet_id = cooldown_conf.tablet_id;
1755
326
        TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
1756
326
        if (tablet == nullptr) {
1757
326
            LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
1758
326
            continue;
1759
2
        }
1760
2
        if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
1761
326
                                         cooldown_conf.cooldown_replica_id) &&
1762
2
            cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
1763
            tablet->tablet_meta()->cooldown_meta_id().initialized()) {
1764
6.23k
            Tablet::async_write_cooldown_meta(tablet);
1765
6.23k
        }
1766
6.23k
    }
1767
6.23k
}
1768
6.23k
1769
6.23k
void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1770
6.23k
    const auto& create_tablet_req = req.create_tablet_req;
1771
6.23k
    RuntimeProfile runtime_profile("CreateTablet");
1772
6.23k
    RuntimeProfile* profile = &runtime_profile;
1773
0
    MonotonicStopWatch watch;
1774
0
    watch.start();
1775
0
    Defer defer = [&] {
1776
0
        auto elapsed_time = static_cast<double>(watch.elapsed_time());
1777
0
        if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
1778
6.23k
            COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
1779
6.23k
            std::stringstream ss;
1780
6.23k
            profile->pretty_print(&ss);
1781
            LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str();
1782
6.23k
        }
1783
6.23k
    };
1784
6.23k
    DorisMetrics::instance()->create_tablet_requests_total->increment(1);
1785
6.23k
    VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
1786
0
1787
0
    std::vector<TTabletInfo> finish_tablet_infos;
1788
0
    VLOG_NOTICE << "create tablet: " << create_tablet_req;
1789
0
    Status status = engine.create_tablet(create_tablet_req, profile);
1790
0
    if (!status.ok()) {
1791
6.23k
        DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
1792
6.23k
        LOG_WARNING("failed to create tablet, reason={}", status.to_string())
1793
                .tag("signature", req.signature)
1794
6.23k
                .tag("tablet_id", create_tablet_req.tablet_id)
1795
6.23k
                .error(status);
1796
6.23k
    } else {
1797
6.23k
        increase_report_version();
1798
6.23k
        // get path hash of the created tablet
1799
6.23k
        TabletSharedPtr tablet;
1800
6.23k
        {
1801
6.23k
            SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
1802
6.23k
            tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id);
1803
6.23k
        }
1804
        DCHECK(tablet != nullptr);
1805
6.23k
        TTabletInfo tablet_info;
1806
6.23k
        tablet_info.tablet_id = tablet->tablet_id();
1807
6.23k
        tablet_info.schema_hash = tablet->schema_hash();
1808
6.23k
        tablet_info.version = create_tablet_req.version;
1809
6.23k
        // Useless but it is a required field in TTabletInfo
1810
6.23k
        tablet_info.version_hash = 0;
1811
6.23k
        tablet_info.row_count = 0;
1812
6.23k
        tablet_info.data_size = 0;
1813
6.23k
        tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
1814
6.23k
        tablet_info.__set_replica_id(tablet->replica_id());
1815
6.23k
        finish_tablet_infos.push_back(tablet_info);
1816
6.23k
        LOG_INFO("successfully create tablet")
1817
6.23k
                .tag("signature", req.signature)
1818
6.23k
                .tag("tablet_id", create_tablet_req.tablet_id);
1819
6.23k
    }
1820
6.23k
    TFinishTaskRequest finish_task_request;
1821
6.23k
    finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
1822
6.23k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1823
6.23k
    finish_task_request.__set_report_version(s_report_version);
1824
6.23k
    finish_task_request.__set_task_type(req.task_type);
1825
    finish_task_request.__set_signature(req.signature);
1826
3.26k
    finish_task_request.__set_task_status(status.to_thrift());
1827
3.26k
    finish_task(finish_task_request);
1828
3.26k
    remove_task_info(req.task_type, req.signature);
1829
3.26k
}
1830
3.26k
1831
3.26k
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1832
3.26k
    const auto& drop_tablet_req = req.drop_tablet_req;
1833
3.26k
    Status status;
1834
18.4E
    auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false);
1835
18.4E
    if (dropped_tablet != nullptr) {
1836
18.4E
        status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id,
1837
3.26k
                                                      drop_tablet_req.replica_id,
1838
                                                      drop_tablet_req.is_drop_table_or_partition);
1839
3.26k
    } else {
1840
3.26k
        status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id);
1841
3.26k
    }
1842
3.26k
    if (status.ok()) {
1843
3.26k
        // if tablet is dropped by fe, then the related txn should also be removed
1844
3.26k
        engine.txn_manager()->force_rollback_tablet_related_txns(
1845
3.26k
                dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
1846
18.4E
                dropped_tablet->tablet_uid());
1847
18.4E
        LOG_INFO("successfully drop tablet")
1848
18.4E
                .tag("signature", req.signature)
1849
18.4E
                .tag("tablet_id", drop_tablet_req.tablet_id)
1850
18.4E
                .tag("replica_id", drop_tablet_req.replica_id);
1851
18.4E
    } else {
1852
18.4E
        LOG_WARNING("failed to drop tablet")
1853
                .tag("signature", req.signature)
1854
3.26k
                .tag("tablet_id", drop_tablet_req.tablet_id)
1855
3.26k
                .tag("replica_id", drop_tablet_req.replica_id)
1856
3.26k
                .error(status);
1857
3.26k
    }
1858
3.26k
1859
    TFinishTaskRequest finish_task_request;
1860
3.26k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1861
3.26k
    finish_task_request.__set_task_type(req.task_type);
1862
3.26k
    finish_task_request.__set_signature(req.signature);
1863
3.26k
    finish_task_request.__set_task_status(status.to_thrift());
1864
1865
3.26k
    TTabletInfo tablet_info;
1866
3.26k
    tablet_info.tablet_id = drop_tablet_req.tablet_id;
1867
3.26k
    tablet_info.schema_hash = drop_tablet_req.schema_hash;
1868
    tablet_info.version = 0;
1869
3.26k
    // Useless but it is a required field in TTabletInfo
1870
3.26k
    tablet_info.version_hash = 0;
1871
3.26k
    tablet_info.row_count = 0;
1872
3.26k
    tablet_info.data_size = 0;
1873
1874
3.26k
    finish_task_request.__set_finish_tablet_infos({tablet_info});
1875
3.26k
    LOG_INFO("successfully drop tablet")
1876
3.26k
            .tag("signature", req.signature)
1877
            .tag("tablet_id", drop_tablet_req.tablet_id);
1878
0
1879
0
    finish_task(finish_task_request);
1880
    remove_task_info(req.task_type, req.signature);
1881
0
}
1882
0
1883
0
void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1884
0
    const auto& drop_tablet_req = req.drop_tablet_req;
1885
0
    // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe
1886
0
    Defer defer = [&] { remove_task_info(req.task_type, req.signature); };
1887
0
    DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
1888
0
        LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
1889
0
                .tag("tablet_id", drop_tablet_req.tablet_id);
1890
0
        return;
1891
0
    });
1892
0
    MonotonicStopWatch watch;
1893
0
    watch.start();
1894
0
    auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
1895
0
    std::ostringstream rowset_ids_stream;
1896
0
    bool found = false;
1897
0
    for (auto& weak_tablet : weak_tablets) {
1898
0
        auto tablet = weak_tablet.lock();
1899
0
        if (tablet == nullptr) {
1900
0
            continue;
1901
0
        }
1902
        if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
1903
0
            continue;
1904
0
        }
1905
0
        found = true;
1906
0
        auto clean_rowsets = tablet->get_snapshot_rowset(true);
1907
0
        // Get first 10 rowset IDs as comma-separated string, just for log
1908
0
        int count = 0;
1909
0
        for (const auto& rowset : clean_rowsets) {
1910
0
            if (count >= 10) break;
1911
0
            if (count > 0) {
1912
                rowset_ids_stream << ",";
1913
0
            }
1914
0
            rowset_ids_stream << rowset->rowset_id().to_string();
1915
0
            count++;
1916
        }
1917
0
1918
0
        CloudTablet::recycle_cached_data(clean_rowsets);
1919
0
        break;
1920
0
    }
1921
0
1922
    if (!found) {
1923
0
        LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
1924
0
                     << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1925
0
        return;
1926
0
    }
1927
0
1928
    engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
1929
22
    LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
1930
22
              << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
1931
              << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
1932
22
}
1933
22
1934
22
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1935
    const auto& push_req = req.push_req;
1936
1937
    LOG(INFO) << "get push task. signature=" << req.signature
1938
              << " push_type=" << push_req.push_type;
1939
22
    std::vector<TTabletInfo> tablet_infos;
1940
22
1941
22
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
1942
    // use the arg BackendService_submit_tasks_args.tasks is not const
1943
    // and push_req will be modify, so modify is ok
1944
22
    EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
1945
22
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1946
22
    auto status = engine_task.execute();
1947
22
1948
22
    // Return result to fe
1949
22
    TFinishTaskRequest finish_task_request;
1950
22
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1951
    finish_task_request.__set_task_type(req.task_type);
1952
22
    finish_task_request.__set_signature(req.signature);
1953
22
    if (push_req.push_type == TPushType::DELETE) {
1954
22
        finish_task_request.__set_request_version(push_req.version);
1955
22
    }
1956
22
1957
22
    if (status.ok()) {
1958
22
        LOG_INFO("successfully execute push task")
1959
22
                .tag("signature", req.signature)
1960
0
                .tag("tablet_id", push_req.tablet_id)
1961
0
                .tag("push_type", push_req.push_type);
1962
0
        increase_report_version();
1963
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
1964
0
    } else {
1965
0
        LOG_WARNING("failed to execute push task")
1966
22
                .tag("signature", req.signature)
1967
22
                .tag("tablet_id", push_req.tablet_id)
1968
                .tag("push_type", push_req.push_type)
1969
22
                .error(status);
1970
22
    }
1971
22
    finish_task_request.__set_task_status(status.to_thrift());
1972
    finish_task_request.__set_report_version(s_report_version);
1973
3.22k
1974
3.22k
    finish_task(finish_task_request);
1975
    remove_task_info(req.task_type, req.signature);
1976
3.22k
}
1977
3.22k
1978
void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1979
    const auto& push_req = req.push_req;
1980
3.22k
1981
3.22k
    LOG(INFO) << "get push task. signature=" << req.signature
1982
3.22k
              << " push_type=" << push_req.push_type;
1983
3.22k
1984
    // Return result to fe
1985
    TFinishTaskRequest finish_task_request;
1986
3.22k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1987
0
    finish_task_request.__set_task_type(req.task_type);
1988
0
    finish_task_request.__set_signature(req.signature);
1989
0
1990
0
    // Only support DELETE in cloud mode now
1991
0
    if (push_req.push_type != TPushType::DELETE) {
1992
0
        finish_task_request.__set_task_status(
1993
                Status::NotSupported("push_type {} not is supported",
1994
3.22k
                                     std::to_string(push_req.push_type))
1995
                        .to_thrift());
1996
3.22k
        return;
1997
3.22k
    }
1998
3.22k
1999
3.22k
    finish_task_request.__set_request_version(push_req.version);
2000
3.22k
2001
3.22k
    DorisMetrics::instance()->delete_requests_total->increment(1);
2002
3.22k
    auto st = CloudDeleteTask::execute(engine, req.push_req);
2003
3.22k
    if (st.ok()) {
2004
3.22k
        LOG_INFO("successfully execute push task")
2005
                .tag("signature", req.signature)
2006
3.22k
                .tag("tablet_id", push_req.tablet_id)
2007
3.22k
                .tag("push_type", push_req.push_type);
2008
3.22k
        increase_report_version();
2009
6
        auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back();
2010
6
        // Just need tablet_id
2011
6
        tablet_info.tablet_id = push_req.tablet_id;
2012
6
        finish_task_request.__isset.finish_tablet_infos = true;
2013
6
    } else {
2014
6
        DorisMetrics::instance()->delete_requests_failed->increment(1);
2015
6
        LOG_WARNING("failed to execute push task")
2016
                .tag("signature", req.signature)
2017
3.22k
                .tag("tablet_id", push_req.tablet_id)
2018
3.22k
                .tag("push_type", push_req.push_type)
2019
                .error(st);
2020
3.22k
    }
2021
3.22k
2022
3.22k
    finish_task_request.__set_task_status(st.to_thrift());
2023
    finish_task_request.__set_report_version(s_report_version);
2024
2025
6
    finish_task(finish_task_request);
2026
2.49k
    remove_task_info(req.task_type, req.signature);
2027
6
}
2028
2029
PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine)
2030
        : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count,
2031
2.49k
                         [this](const TAgentTaskRequest& task) { publish_version_callback(task); }),
2032
2.49k
          _engine(engine) {}
2033
2.49k
2034
2.49k
PublishVersionWorkerPool::~PublishVersionWorkerPool() = default;
2035
2036
2.49k
void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) {
2037
2.49k
    const auto& publish_version_req = req.publish_version_req;
2038
    DorisMetrics::instance()->publish_task_request_total->increment(1);
2039
2.49k
    VLOG_NOTICE << "get publish version task. signature=" << req.signature;
2040
2.49k
2041
2.49k
    std::set<TTabletId> error_tablet_ids;
2042
2.49k
    std::map<TTabletId, TVersion> succ_tablets;
2043
2.49k
    // partition_id, tablet_id, publish_version
2044
2.49k
    std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
2045
2.49k
    std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
2046
2.49k
    uint32_t retry_time = 0;
2047
2.49k
    Status status;
2048
2.49k
    constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
2049
2.49k
    while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
2050
2.49k
        succ_tablets.clear();
2051
2.49k
        error_tablet_ids.clear();
2052
2.49k
        table_id_to_tablet_id_to_num_delta_rows.clear();
2053
2.49k
        EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids,
2054
2.49k
                                             &succ_tablets, &discontinuous_version_tablets,
2055
2.49k
                                             &table_id_to_tablet_id_to_num_delta_rows);
2056
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2057
0
        status = engine_task.execute();
2058
        if (status.ok()) {
2059
            break;
2060
0
        }
2061
0
2062
0
        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
2063
0
            // there are too many missing versions, it has been be added to async
2064
0
            // publish task, so no need to retry here.
2065
            if (discontinuous_version_tablets.empty()) {
2066
0
                break;
2067
0
            }
2068
0
            LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
2069
0
                                   << "transaction_id: " << publish_version_req.transaction_id;
2070
0
2071
0
            int64_t time_elapsed = time(nullptr) - req.recv_time;
2072
            if (time_elapsed > config::publish_version_task_timeout_s) {
2073
                LOG(INFO) << "task elapsed " << time_elapsed
2074
0
                          << " seconds since it is inserted to queue, it is timeout";
2075
0
                break;
2076
0
            }
2077
0
2078
0
            // Version not continuous, put to queue and wait pre version publish task execute
2079
0
            PUBLISH_VERSION_count << 1;
2080
0
            auto st = _thread_pool->submit_func([this, req] {
2081
0
                this->publish_version_callback(req);
2082
0
                PUBLISH_VERSION_count << -1;
2083
0
            });
2084
0
            if (!st.ok()) [[unlikely]] {
2085
0
                PUBLISH_VERSION_count << -1;
2086
                status = std::move(st);
2087
0
            } else {
2088
0
                return;
2089
0
            }
2090
0
        }
2091
0
2092
0
        LOG_WARNING("failed to publish version")
2093
0
                .tag("transaction_id", publish_version_req.transaction_id)
2094
                .tag("error_tablets_num", error_tablet_ids.size())
2095
2.49k
                .tag("retry_time", retry_time)
2096
0
                .error(status);
2097
0
        ++retry_time;
2098
0
    }
2099
2.49k
2100
2.49k
    for (auto& item : discontinuous_version_tablets) {
2101
0
        _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item),
2102
                                       publish_version_req.transaction_id, false);
2103
    }
2104
0
    TFinishTaskRequest finish_task_request;
2105
0
    if (!status.ok()) [[unlikely]] {
2106
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2107
0
        // if publish failed, return failed, FE will ignore this error and
2108
0
        // check error tablet ids and FE will also republish this task
2109
2.49k
        LOG_WARNING("failed to publish version")
2110
2.49k
                .tag("signature", req.signature)
2111
2.49k
                .tag("transaction_id", publish_version_req.transaction_id)
2112
2.49k
                .tag("error_tablets_num", error_tablet_ids.size())
2113
15.1k
                .error(status);
2114
15.1k
    } else {
2115
15.1k
        if (!config::disable_auto_compaction &&
2116
15.1k
            (!config::enable_compaction_pause_on_high_memory ||
2117
15.1k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
2118
15.1k
            for (auto [tablet_id, _] : succ_tablets) {
2119
15.1k
                TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
2120
15.1k
                if (tablet != nullptr) {
2121
15.1k
                    if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
2122
15.1k
                        tablet->published_count.fetch_add(1);
2123
15.1k
                        int64_t published_count = tablet->published_count.load();
2124
0
                        int32_t max_version_config = tablet->max_version_config();
2125
0
                        if (tablet->exceed_version_limit(
2126
0
                                    max_version_config *
2127
0
                                    config::load_trigger_compaction_version_percent / 100) &&
2128
0
                            published_count % 20 == 0) {
2129
0
                            auto st = _engine.submit_compaction_task(
2130
0
                                    tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
2131
0
                            if (!st.ok()) [[unlikely]] {
2132
0
                                LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
2133
0
                                             << ", published=" << published_count << " : " << st;
2134
15.1k
                            } else {
2135
15.1k
                                LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
2136
0
                                          << ", published:" << published_count;
2137
0
                            }
2138
15.1k
                        }
2139
2.49k
                    }
2140
2.49k
                } else {
2141
2.49k
                    LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
2142
2.49k
                }
2143
2.49k
            }
2144
2.49k
        }
2145
2.49k
        int64_t cost_second = time(nullptr) - req.recv_time;
2146
2.49k
        g_publish_version_latency << cost_second;
2147
2.49k
        LOG_INFO("successfully publish version")
2148
                .tag("signature", req.signature)
2149
2.49k
                .tag("transaction_id", publish_version_req.transaction_id)
2150
2.49k
                .tag("tablets_num", succ_tablets.size())
2151
2.49k
                .tag("cost(s)", cost_second);
2152
2.49k
    }
2153
2.49k
2154
2.49k
    status.to_thrift(&finish_task_request.task_status);
2155
2.49k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2156
2.49k
    finish_task_request.__set_task_type(req.task_type);
2157
2.49k
    finish_task_request.__set_signature(req.signature);
2158
2.49k
    finish_task_request.__set_report_version(s_report_version);
2159
2.49k
    finish_task_request.__set_succ_tablets(succ_tablets);
2160
2.49k
    finish_task_request.__set_error_tablet_ids(
2161
2.49k
            std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
2162
    finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
2163
14
            table_id_to_tablet_id_to_num_delta_rows);
2164
14
    finish_task(finish_task_request);
2165
14
    remove_task_info(req.task_type, req.signature);
2166
14
}
2167
14
2168
void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2169
14
    const auto& clear_transaction_task_req = req.clear_transaction_task_req;
2170
    LOG(INFO) << "get clear transaction task. signature=" << req.signature
2171
14
              << ", transaction_id=" << clear_transaction_task_req.transaction_id
2172
              << ", partition_id_size=" << clear_transaction_task_req.partition_id.size();
2173
2174
    Status status;
2175
14
2176
2
    if (clear_transaction_task_req.transaction_id > 0) {
2177
2
        // transaction_id should be greater than zero.
2178
12
        // If it is not greater than zero, no need to execute
2179
12
        // the following clear_transaction_task() function.
2180
12
        if (!clear_transaction_task_req.partition_id.empty()) {
2181
14
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id,
2182
14
                                          clear_transaction_task_req.partition_id);
2183
14
        } else {
2184
0
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id);
2185
0
        }
2186
0
        LOG(INFO) << "finish to clear transaction task. signature=" << req.signature
2187
                  << ", transaction_id=" << clear_transaction_task_req.transaction_id;
2188
14
    } else {
2189
14
        LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id
2190
14
                     << ". signature= " << req.signature;
2191
14
    }
2192
14
2193
    TFinishTaskRequest finish_task_request;
2194
14
    finish_task_request.__set_task_status(status.to_thrift());
2195
14
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2196
14
    finish_task_request.__set_task_type(req.task_type);
2197
    finish_task_request.__set_signature(req.signature);
2198
0
2199
0
    finish_task(finish_task_request);
2200
0
    remove_task_info(req.task_type, req.signature);
2201
0
}
2202
0
2203
0
void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2204
0
    int64_t signature = req.signature;
2205
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2206
0
    bool is_task_timeout = false;
2207
0
    if (req.__isset.recv_time) {
2208
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2209
0
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2210
0
            LOG(INFO) << "task elapsed " << time_elapsed
2211
0
                      << " seconds since it is inserted to queue, it is timeout";
2212
0
            is_task_timeout = true;
2213
0
        }
2214
0
    }
2215
0
    if (!is_task_timeout) {
2216
0
        TFinishTaskRequest finish_task_request;
2217
0
        TTaskType::type task_type = req.task_type;
2218
0
        alter_tablet(engine, req, signature, task_type, &finish_task_request);
2219
0
        finish_task(finish_task_request);
2220
0
    }
2221
0
    doris::g_fragment_executing_count << -1;
2222
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
2223
                          std::chrono::system_clock::now().time_since_epoch())
2224
10.5k
                          .count();
2225
10.5k
    g_fragment_last_active_time.set_value(now);
2226
10.5k
    remove_task_info(req.task_type, req.signature);
2227
10.5k
}
2228
10.5k
2229
10.5k
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2230
10.5k
    int64_t signature = req.signature;
2231
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2232
0
    bool is_task_timeout = false;
2233
0
    if (req.__isset.recv_time) {
2234
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2235
10.5k
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2236
10.5k
            LOG(INFO) << "task elapsed " << time_elapsed
2237
10.5k
                      << " seconds since it is inserted to queue, it is timeout";
2238
10.5k
            is_task_timeout = true;
2239
10.5k
        }
2240
10.5k
    }
2241
10.5k
    if (!is_task_timeout) {
2242
10.5k
        TFinishTaskRequest finish_task_request;
2243
10.5k
        TTaskType::type task_type = req.task_type;
2244
10.5k
        alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request);
2245
10.5k
        finish_task(finish_task_request);
2246
10.5k
    }
2247
10.5k
    doris::g_fragment_executing_count << -1;
2248
10.5k
    int64_t now = duration_cast<std::chrono::milliseconds>(
2249
                          std::chrono::system_clock::now().time_since_epoch())
2250
0
                          .count();
2251
0
    g_fragment_last_active_time.set_value(now);
2252
0
2253
0
    // Clean up alter_version before remove_task_info to avoid race:
2254
0
    // remove_task_info allows same-signature re-submit, whose pre_submit_callback
2255
0
    // would set alter_version, then this cleanup would wipe it.
2256
0
    if (req.__isset.alter_tablet_req_v2) {
2257
0
        const auto& alter_req = req.alter_tablet_req_v2;
2258
0
        auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2259
0
        auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2260
        if (new_tablet.has_value()) {
2261
0
            new_tablet.value()->set_alter_version(-1);
2262
0
        }
2263
        if (base_tablet.has_value()) {
2264
0
            base_tablet.value()->set_alter_version(-1);
2265
0
        }
2266
    }
2267
0
2268
0
    remove_task_info(req.task_type, req.signature);
2269
}
2270
2.42k
2271
2.42k
void set_alter_version_before_enqueue(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2272
2.42k
    if (!req.__isset.alter_tablet_req_v2) {
2273
2.42k
        return;
2274
2.42k
    }
2275
    const auto& alter_req = req.alter_tablet_req_v2;
2276
    if (alter_req.alter_version <= 1) {
2277
0
        return;
2278
0
    }
2279
    auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2280
0
    if (!new_tablet.has_value() || new_tablet.value()->tablet_state() == TABLET_RUNNING) {
2281
0
        return;
2282
    }
2283
0
    auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2284
0
    if (!base_tablet.has_value()) {
2285
0
        return;
2286
0
    }
2287
    new_tablet.value()->set_alter_version(alter_req.alter_version);
2288
0
    base_tablet.value()->set_alter_version(alter_req.alter_version);
2289
0
    LOG(INFO) << "set alter_version=" << alter_req.alter_version
2290
0
              << " before enqueue, base_tablet=" << alter_req.base_tablet_id
2291
0
              << ", new_tablet=" << alter_req.new_tablet_id;
2292
0
}
2293
2294
0
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2295
0
    std::unordered_map<int64_t, int64_t> gc_tablet_infos;
2296
0
    if (!req.__isset.gc_binlog_req) {
2297
0
        LOG(WARNING) << "gc binlog task is not valid";
2298
0
        return;
2299
0
    }
2300
0
    if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
2301
0
        LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
2302
0
        return;
2303
0
    }
2304
0
2305
0
    const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos;
2306
    for (auto&& tablet_info : tablet_gc_binlog_infos) {
2307
0
        // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
2308
0
        gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
2309
0
    }
2310
0
2311
0
    engine.gc_binlogs(gc_tablet_infos);
2312
0
}
2313
0
2314
0
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2315
    const TVisibleVersionReq& visible_version_req = req.visible_version_req;
2316
0
    engine.tablet_manager()->update_partitions_visible_version(
2317
0
            visible_version_req.partition_version);
2318
0
}
2319
2320
0
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
2321
0
                    const TAgentTaskRequest& req) {
2322
    const auto& clone_req = req.clone_req;
2323
2324
0
    DorisMetrics::instance()->clone_requests_total->increment(1);
2325
0
    LOG(INFO) << "get clone task. signature=" << req.signature;
2326
2327
0
    std::vector<TTabletInfo> tablet_infos;
2328
0
    EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos);
2329
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2330
0
    auto status = engine_task.execute();
2331
0
    // Return result to fe
2332
0
    TFinishTaskRequest finish_task_request;
2333
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2334
0
    finish_task_request.__set_task_type(req.task_type);
2335
0
    finish_task_request.__set_signature(req.signature);
2336
0
    finish_task_request.__set_task_status(status.to_thrift());
2337
0
2338
0
    if (!status.ok()) {
2339
0
        DorisMetrics::instance()->clone_requests_failed->increment(1);
2340
0
        LOG_WARNING("failed to clone tablet")
2341
0
                .tag("signature", req.signature)
2342
0
                .tag("tablet_id", clone_req.tablet_id)
2343
0
                .error(status);
2344
0
    } else {
2345
0
        LOG_INFO("successfully clone tablet")
2346
0
                .tag("signature", req.signature)
2347
                .tag("tablet_id", clone_req.tablet_id)
2348
0
                .tag("copy_size", engine_task.get_copy_size())
2349
0
                .tag("copy_time_ms", engine_task.get_copy_time_ms());
2350
0
2351
0
        if (engine_task.is_new_tablet()) {
2352
0
            increase_report_version();
2353
            finish_task_request.__set_report_version(s_report_version);
2354
0
        }
2355
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
2356
0
        finish_task_request.__set_copy_size(engine_task.get_copy_size());
2357
        finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms());
2358
9.15k
    }
2359
9.15k
2360
9.15k
    finish_task(finish_task_request);
2361
9.15k
    remove_task_info(req.task_type, req.signature);
2362
9.15k
}
2363
9.15k
2364
9.15k
void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2365
9.15k
    const auto& storage_medium_migrate_req = req.storage_medium_migrate_req;
2366
9.15k
2367
9.15k
    // check request and get info
2368
    TabletSharedPtr tablet;
2369
0
    DataDir* dest_store = nullptr;
2370
0
2371
0
    auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store);
2372
0
    if (status.ok()) {
2373
9.15k
        EngineStorageMigrationTask engine_task(engine, tablet, dest_store);
2374
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2375
9.15k
        status = engine_task.execute();
2376
9.15k
    }
2377
0
    // fe should ignore this err
2378
0
    if (status.is<FILE_ALREADY_EXIST>()) {
2379
0
        status = Status::OK();
2380
0
    }
2381
0
    if (!status.ok()) {
2382
0
        LOG_WARNING("failed to migrate storage medium")
2383
0
                .tag("signature", req.signature)
2384
                .tag("tablet_id", storage_medium_migrate_req.tablet_id)
2385
9.15k
                .error(status);
2386
9.15k
    } else {
2387
9.15k
        LOG_INFO("successfully migrate storage medium")
2388
9.15k
                .tag("signature", req.signature)
2389
9.15k
                .tag("tablet_id", storage_medium_migrate_req.tablet_id);
2390
9.15k
    }
2391
9.15k
2392
    TFinishTaskRequest finish_task_request;
2393
9.15k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2394
9.15k
    finish_task_request.__set_task_type(req.task_type);
2395
9.15k
    finish_task_request.__set_signature(req.signature);
2396
    finish_task_request.__set_task_status(status.to_thrift());
2397
2398
26.4k
    finish_task(finish_task_request);
2399
26.4k
    remove_task_info(req.task_type, req.signature);
2400
0
}
2401
0
2402
26.4k
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2403
26.4k
    std::vector<TTabletId> error_tablet_ids;
2404
26.4k
    std::vector<TTabletId> succ_tablet_ids;
2405
    Status status;
2406
26.4k
    error_tablet_ids.clear();
2407
26.4k
    const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
2408
    CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
2409
26.4k
                                                &succ_tablet_ids);
2410
26.4k
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2411
26.4k
    if (req.signature != calc_delete_bitmap_req.transaction_id) {
2412
26.4k
        // transaction_id may not be the same as req.signature, so add a log here
2413
        LOG_INFO("begin to execute calc delete bitmap task")
2414
                .tag("signature", req.signature)
2415
180k
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id);
2416
180k
    }
2417
180k
    status = engine_task.execute();
2418
180k
2419
180k
    TFinishTaskRequest finish_task_request;
2420
180k
    if (!status) {
2421
180k
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2422
0
        LOG_WARNING("failed to calculate delete bitmap")
2423
0
                .tag("signature", req.signature)
2424
180k
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
2425
                .tag("error_tablets_num", error_tablet_ids.size())
2426
180k
                .error(status);
2427
180k
    }
2428
180k
2429
0
    status.to_thrift(&finish_task_request.task_status);
2430
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2431
180k
    finish_task_request.__set_task_type(req.task_type);
2432
180k
    finish_task_request.__set_signature(req.signature);
2433
180k
    finish_task_request.__set_report_version(s_report_version);
2434
180k
    finish_task_request.__set_error_tablet_ids(error_tablet_ids);
2435
180k
    finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);
2436
180k
2437
180k
    finish_task(finish_task_request);
2438
180k
    remove_task_info(req.task_type, req.signature);
2439
180k
}
2440
180k
2441
180k
void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
2442
180k
                                              const TAgentTaskRequest& req) {
2443
180k
    if (!config::enable_cloud_make_rs_visible_on_be) {
2444
26.4k
        return;
2445
26.4k
    }
2446
26.4k
    LOG(INFO) << "begin to make cloud tmp rs visible, txn_id="
2447
              << req.make_cloud_tmp_rs_visible_req.txn_id
2448
0
              << ", tablet_count=" << req.make_cloud_tmp_rs_visible_req.tablet_ids.size();
2449
0
2450
0
    const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req;
2451
0
    auto& tablet_mgr = engine.tablet_mgr();
2452
0
2453
0
    int64_t txn_id = make_visible_req.txn_id;
2454
0
    int64_t version_update_time_ms = make_visible_req.__isset.version_update_time_ms
2455
                                             ? make_visible_req.version_update_time_ms
2456
4
                                             : 0;
2457
4
2458
    // Process each tablet involved in this transaction on this BE
2459
4
    for (int64_t tablet_id : make_visible_req.tablet_ids) {
2460
4
        auto tablet_result =
2461
4
                tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false,
2462
                                      /* sync_delete_bitmap */ false,
2463
4
                                      /* sync_stats */ nullptr, /* force_use_only_cached */ true,
2464
0
                                      /* cache_on_miss */ false);
2465
0
        if (!tablet_result.has_value()) {
2466
            continue;
2467
4
        }
2468
4
        auto cloud_tablet = tablet_result.value();
2469
2470
1.34k
        int64_t partition_id = cloud_tablet->partition_id();
2471
1.34k
        auto version_iter = make_visible_req.partition_version_map.find(partition_id);
2472
1.34k
        if (version_iter == make_visible_req.partition_version_map.end()) {
2473
1.34k
            continue;
2474
26.2k
        }
2475
26.2k
        int64_t visible_version = version_iter->second;
2476
26.2k
        DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", {
2477
1.34k
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
2478
1.34k
            auto target_table_id = dp->param<int64_t>("table_id", -1);
2479
1.34k
            auto version = dp->param<int64_t>("version", -1);
2480
1.34k
            if ((target_tablet_id == tablet_id || target_table_id == cloud_tablet->table_id()) &&
2481
1.34k
                version == visible_version) {
2482
0
                DBUG_BLOCK
2483
0
            }
2484
1.34k
        });
2485
        cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version,
2486
                                                    version_update_time_ms);
2487
    }
2488
    LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id
2489
              << ", processed_tablets=" << make_visible_req.tablet_ids.size();
2490
}
2491
2492
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2493
    LOG(INFO) << "clean trash start";
2494
    DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
2495
    static_cast<void>(engine.start_trash_sweep(nullptr, true));
2496
    static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
2497
    LOG(INFO) << "clean trash finish";
2498
}
2499
2500
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2501
    const auto& clean_req = req.clean_udf_cache_req;
2502
2503
    if (doris::config::enable_java_support) {
2504
        static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2505
    }
2506
2507
    if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2508
        UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
2509
    }
2510
2511
    LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
2512
}
2513
2514
void report_index_policy_callback(const ClusterInfo* cluster_info) {
2515
    TReportRequest request;
2516
    auto& index_policy_list = request.index_policy;
2517
    const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys();
2518
    for (const auto& policy : policys) {
2519
        index_policy_list.emplace_back(policy.second);
2520
    }
2521
    request.__isset.index_policy = true;
2522
    request.__set_backend(BackendOptions::get_local_backend());
2523
    bool succ = handle_report(request, cluster_info, "index_policy");
2524
    report_index_policy_total << 1;
2525
    if (!succ) [[unlikely]] {
2526
        report_index_policy_failed << 1;
2527
    }
2528
}
2529
2530
#include "common/compile_check_end.h"
2531
} // namespace doris