Coverage Report

Created: 2026-06-03 03:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/storage_engine.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/storage_engine.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/AgentService_types.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/Types_types.h>
25
#include <glog/logging.h>
26
#include <rapidjson/document.h>
27
#include <rapidjson/encodings.h>
28
#include <rapidjson/prettywriter.h>
29
#include <rapidjson/stringbuffer.h>
30
#include <sys/resource.h>
31
#include <thrift/protocol/TDebugProtocol.h>
32
33
#include <algorithm>
34
#include <boost/algorithm/string/case_conv.hpp>
35
#include <boost/container/detail/std_fwd.hpp>
36
#include <cassert>
37
#include <cerrno> // IWYU pragma: keep
38
#include <chrono>
39
#include <cstdlib>
40
#include <cstring>
41
#include <filesystem>
42
#include <iterator>
43
#include <memory>
44
#include <mutex>
45
#include <ostream>
46
#include <set>
47
#include <thread>
48
#include <unordered_set>
49
#include <utility>
50
51
#include "agent/task_worker_pool.h"
52
#include "cloud/cloud_storage_engine.h"
53
#include "common/config.h"
54
#include "common/logging.h"
55
#include "common/metrics/doris_metrics.h"
56
#include "common/metrics/metrics.h"
57
#include "common/status.h"
58
#include "core/assert_cast.h"
59
#include "io/fs/local_file_system.h"
60
#include "load/memtable/memtable_flush_executor.h"
61
#include "load/stream_load/stream_load_recorder.h"
62
#include "runtime/exec_env.h"
63
#include "storage/binlog.h"
64
#include "storage/compaction/single_replica_compaction.h"
65
#include "storage/data_dir.h"
66
#include "storage/id_manager.h"
67
#include "storage/olap_common.h"
68
#include "storage/olap_define.h"
69
#include "storage/rowset/rowset_fwd.h"
70
#include "storage/rowset/rowset_meta.h"
71
#include "storage/rowset/rowset_meta_manager.h"
72
#include "storage/rowset/unique_rowset_id_generator.h"
73
#include "storage/snapshot/snapshot_manager.h"
74
#include "storage/tablet/tablet_manager.h"
75
#include "storage/tablet/tablet_meta.h"
76
#include "storage/tablet/tablet_meta_manager.h"
77
#include "storage/txn/txn_manager.h"
78
#include "util/client_cache.h"
79
#include "util/mem_info.h"
80
#include "util/stopwatch.hpp"
81
#include "util/thread.h"
82
#include "util/threadpool.h"
83
#include "util/thrift_rpc_helper.h"
84
#include "util/uid_util.h"
85
#include "util/work_thread_pool.hpp"
86
87
using std::filesystem::directory_iterator;
88
using std::filesystem::path;
89
using std::map;
90
using std::set;
91
using std::string;
92
using std::stringstream;
93
using std::vector;
94
95
namespace doris {
96
using namespace ErrorCode;
97
extern void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos,
98
                                   std::vector<DataDir*>& stores);
99
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
100
bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap(
101
        "max_rowsets_with_useless_delete_bitmap", 0);
102
bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version(
103
        "max_rowsets_with_useless_delete_bitmap_version", 0);
104
105
namespace {
106
bvar::Adder<uint64_t> unused_rowsets_counter("ununsed_rowsets_counter");
107
};
108
109
BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid)
110
561
        : _type(type),
111
561
          _rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)),
112
561
          _stop_background_threads_latch(1) {
113
561
    _memory_limitation_bytes_for_schema_change = static_cast<int64_t>(
114
561
            static_cast<double>(MemInfo::soft_mem_limit()) * config::schema_change_mem_limit_frac);
115
561
    _tablet_max_delete_bitmap_score_metrics =
116
561
            std::make_shared<bvar::Status<size_t>>("tablet_max", "delete_bitmap_score", 0);
117
561
    _tablet_max_base_rowset_delete_bitmap_score_metrics = std::make_shared<bvar::Status<size_t>>(
118
561
            "tablet_max_base_rowset", "delete_bitmap_score", 0);
119
561
}
120
121
557
BaseStorageEngine::~BaseStorageEngine() = default;
122
123
260k
RowsetId BaseStorageEngine::next_rowset_id() {
124
260k
    return _rowset_id_generator->next_id();
125
260k
}
126
127
116k
StorageEngine& BaseStorageEngine::to_local() {
128
116k
    CHECK_EQ(_type, Type::LOCAL);
129
116k
    return *static_cast<StorageEngine*>(this);
130
116k
}
131
132
1.99M
CloudStorageEngine& BaseStorageEngine::to_cloud() {
133
1.99M
    CHECK_EQ(_type, Type::CLOUD);
134
1.99M
    return *static_cast<CloudStorageEngine*>(this);
135
1.99M
}
136
137
34.9k
int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const {
138
34.9k
    return std::max(_memory_limitation_bytes_for_schema_change / config::alter_tablet_worker_count,
139
34.9k
                    config::memory_limitation_per_thread_for_schema_change_bytes);
140
34.9k
}
141
142
7
void BaseStorageEngine::_start_adaptive_thread_controller() {
143
7
    if (!config::enable_adaptive_flush_threads) {
144
0
        return;
145
0
    }
146
147
7
    auto* system_metrics = DorisMetrics::instance()->system_metrics();
148
7
    auto* s3_upload_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
149
150
7
    _adaptive_thread_controller.init(system_metrics, s3_upload_pool);
151
152
7
    if (_memtable_flush_executor) {
153
7
        auto* flush_pool = _memtable_flush_executor->flush_pool();
154
7
        auto* high_prio_pool = _memtable_flush_executor->high_prio_flush_pool();
155
7
        _adaptive_thread_controller.add("flush", {flush_pool, high_prio_pool},
156
7
                                        AdaptiveThreadPoolController::make_flush_adjust_func(
157
7
                                                &_adaptive_thread_controller, flush_pool),
158
7
                                        config::max_flush_thread_num_per_cpu,
159
7
                                        config::min_flush_thread_num_per_cpu);
160
7
    }
161
7
}
162
163
50
Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) {
164
50
    LOG(INFO) << "stream load record path: " << stream_load_record_path;
165
    // init stream load record rocksdb
166
50
    _stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path);
167
50
    if (_stream_load_recorder == nullptr) {
168
0
        RETURN_NOT_OK_STATUS_WITH_WARN(
169
0
                Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
170
0
                "new StreamLoadRecorder failed");
171
0
    }
172
50
    auto st = _stream_load_recorder->init();
173
50
    if (!st.ok()) {
174
0
        RETURN_NOT_OK_STATUS_WITH_WARN(
175
0
                Status::IOError("open StreamLoadRecorder rocksdb failed, path={}",
176
0
                                stream_load_record_path),
177
0
                "init StreamLoadRecorder failed");
178
0
    }
179
50
    return Status::OK();
180
50
}
181
182
1
void CompactionSubmitRegistry::jsonfy_compaction_status(std::string* result) {
183
1
    rapidjson::Document root;
184
1
    root.SetObject();
185
186
3
    auto add_node = [&root](const std::string& name, const Registry& registry) {
187
3
        rapidjson::Value compaction_name;
188
3
        compaction_name.SetString(name.c_str(), cast_set<uint32_t>(name.length()),
189
3
                                  root.GetAllocator());
190
3
        rapidjson::Document path_obj;
191
3
        path_obj.SetObject();
192
3
        for (const auto& it : registry) {
193
0
            const auto& dir = it.first->path();
194
0
            rapidjson::Value path_key;
195
0
            path_key.SetString(dir.c_str(), cast_set<uint32_t>(dir.length()), root.GetAllocator());
196
197
0
            rapidjson::Document arr;
198
0
            arr.SetArray();
199
200
0
            for (const auto& tablet : it.second) {
201
0
                rapidjson::Value tablet_id;
202
0
                auto tablet_id_str = std::to_string(tablet->tablet_id());
203
0
                tablet_id.SetString(tablet_id_str.c_str(),
204
0
                                    cast_set<uint32_t>(tablet_id_str.length()),
205
0
                                    root.GetAllocator());
206
0
                arr.PushBack(tablet_id, root.GetAllocator());
207
0
            }
208
0
            path_obj.AddMember(path_key, arr, root.GetAllocator());
209
0
        }
210
3
        root.AddMember(compaction_name, path_obj, root.GetAllocator());
211
3
    };
212
213
1
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
214
1
    add_node("BaseCompaction", _tablet_submitted_base_compaction);
215
1
    add_node("CumulativeCompaction", _tablet_submitted_cumu_compaction);
216
1
    add_node("FullCompaction", _tablet_submitted_full_compaction);
217
218
1
    rapidjson::StringBuffer str_buf;
219
1
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf);
220
1
    root.Accept(writer);
221
1
    *result = std::string(str_buf.GetString());
222
1
}
223
224
49
static Status _validate_options(const EngineOptions& options) {
225
49
    if (options.store_paths.empty()) {
226
0
        return Status::InternalError("store paths is empty");
227
0
    }
228
49
    return Status::OK();
229
49
}
230
231
49
Status StorageEngine::open() {
232
49
    RETURN_IF_ERROR(_validate_options(_options));
233
49
    LOG(INFO) << "starting backend using uid:" << _options.backend_uid.to_string();
234
49
    RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed");
235
49
    LOG(INFO) << "success to init storage engine.";
236
49
    return Status::OK();
237
49
}
238
239
StorageEngine::StorageEngine(const EngineOptions& options)
240
392
        : BaseStorageEngine(Type::LOCAL, options.backend_uid),
241
392
          _options(options),
242
392
          _available_storage_medium_type_count(0),
243
392
          _is_all_cluster_id_exist(true),
244
392
          _stopped(false),
245
392
          _tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
246
392
          _txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
247
392
          _default_rowset_type(BETA_ROWSET),
248
392
          _create_tablet_idx_lru_cache(
249
392
                  new CreateTabletRRIdxCache(config::partition_disk_index_lru_size)),
250
392
          _snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
251
392
    REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
252
        // std::lock_guard<std::mutex> lock(_gc_mutex);
253
392
        return _unused_rowsets.size();
254
392
    });
255
256
392
    _broken_paths = options.broken_paths;
257
392
}
258
259
389
StorageEngine::~StorageEngine() {
260
389
    DEREGISTER_HOOK_METRIC(unused_rowsets_count);
261
389
    stop();
262
389
}
263
264
49
static Status load_data_dirs(const std::vector<DataDir*>& data_dirs) {
265
49
    std::unique_ptr<ThreadPool> pool;
266
267
49
    int num_threads = config::load_data_dirs_threads;
268
49
    if (num_threads <= 0) {
269
49
        num_threads = cast_set<int>(data_dirs.size());
270
49
    }
271
272
49
    auto st = ThreadPoolBuilder("load_data_dir")
273
49
                      .set_min_threads(num_threads)
274
49
                      .set_max_threads(num_threads)
275
49
                      .build(&pool);
276
49
    CHECK(st.ok()) << st;
277
278
49
    std::mutex result_mtx;
279
49
    Status result;
280
281
60
    for (auto* data_dir : data_dirs) {
282
60
        st = pool->submit_func([&, data_dir] {
283
60
            SCOPED_INIT_THREAD_CONTEXT();
284
60
            {
285
60
                std::lock_guard lock(result_mtx);
286
60
                if (!result.ok()) { // Some data dir has failed
287
0
                    return;
288
0
                }
289
60
            }
290
291
60
            auto st = data_dir->load();
292
60
            if (!st.ok()) {
293
0
                LOG(WARNING) << "error occurred when init load tables. res=" << st
294
0
                             << ", data dir=" << data_dir->path();
295
0
                std::lock_guard lock(result_mtx);
296
0
                result = std::move(st);
297
0
            }
298
60
        });
299
300
60
        if (!st.ok()) {
301
0
            return st;
302
0
        }
303
60
    }
304
305
49
    pool->wait();
306
307
49
    return result;
308
49
}
309
310
49
Status StorageEngine::_open() {
311
    // init store_map
312
49
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_store_map(), "_init_store_map failed");
313
314
49
    _effective_cluster_id = config::cluster_id;
315
49
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to check cluster id");
316
317
49
    _update_storage_medium_type_count();
318
319
49
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed");
320
321
49
    auto dirs = get_stores();
322
49
    RETURN_IF_ERROR(load_data_dirs(dirs));
323
324
49
    _disk_num = cast_set<int>(dirs.size());
325
49
    _memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
326
49
    _memtable_flush_executor->init(_disk_num);
327
328
49
    _calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
329
49
    _calc_delete_bitmap_executor->init(config::calc_delete_bitmap_max_thread);
330
331
49
    _calc_delete_bitmap_executor_for_load = std::make_unique<CalcDeleteBitmapExecutor>();
332
49
    _calc_delete_bitmap_executor_for_load->init(
333
49
            config::calc_delete_bitmap_for_load_max_thread > 0
334
49
                    ? config::calc_delete_bitmap_for_load_max_thread
335
49
                    : std::max(1, CpuInfo::num_cores() / 2));
336
337
49
    _parse_default_rowset_type();
338
339
49
    return Status::OK();
340
49
}
341
342
49
Status StorageEngine::_init_store_map() {
343
49
    std::vector<std::thread> threads;
344
49
    std::mutex error_msg_lock;
345
49
    std::string error_msg;
346
60
    for (auto& path : _options.store_paths) {
347
60
        auto store = std::make_unique<DataDir>(*this, path.path, path.capacity_bytes,
348
60
                                               path.storage_medium);
349
60
        threads.emplace_back([store = store.get(), &error_msg_lock, &error_msg]() {
350
60
            SCOPED_INIT_THREAD_CONTEXT();
351
60
            auto st = store->init();
352
60
            if (!st.ok()) {
353
0
                {
354
0
                    std::lock_guard<std::mutex> l(error_msg_lock);
355
0
                    error_msg.append(st.to_string() + ";");
356
0
                }
357
0
                LOG(WARNING) << "Store load failed, status=" << st.to_string()
358
0
                             << ", path=" << store->path();
359
0
            }
360
60
        });
361
60
        _store_map.emplace(store->path(), std::move(store));
362
60
    }
363
60
    for (auto& thread : threads) {
364
60
        thread.join();
365
60
    }
366
367
    // All store paths MUST init successfully
368
49
    if (!error_msg.empty()) {
369
0
        return Status::InternalError("init path failed, error={}", error_msg);
370
0
    }
371
372
49
    RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path),
373
49
                                   "init StreamLoadRecorder failed");
374
375
49
    return Status::OK();
376
49
}
377
378
1.71k
void StorageEngine::_update_storage_medium_type_count() {
379
1.71k
    set<TStorageMedium::type> available_storage_medium_types;
380
381
1.71k
    std::lock_guard<std::mutex> l(_store_lock);
382
1.94k
    for (auto& it : _store_map) {
383
1.94k
        if (it.second->is_used()) {
384
1.94k
            available_storage_medium_types.insert(it.second->storage_medium());
385
1.94k
        }
386
1.94k
    }
387
388
1.71k
    _available_storage_medium_type_count =
389
1.71k
            cast_set<uint32_t>(available_storage_medium_types.size());
390
1.71k
}
391
392
49
Status StorageEngine::_judge_and_update_effective_cluster_id(int32_t cluster_id) {
393
49
    if (cluster_id == -1 && _effective_cluster_id == -1) {
394
        // maybe this is a new cluster, cluster id will get from heartbeat message
395
45
        return Status::OK();
396
45
    } else if (cluster_id != -1 && _effective_cluster_id == -1) {
397
4
        _effective_cluster_id = cluster_id;
398
4
        return Status::OK();
399
4
    } else if (cluster_id == -1 && _effective_cluster_id != -1) {
400
        // _effective_cluster_id is the right effective cluster id
401
0
        return Status::OK();
402
0
    } else {
403
0
        if (cluster_id != _effective_cluster_id) {
404
0
            RETURN_NOT_OK_STATUS_WITH_WARN(
405
0
                    Status::Corruption("multiple cluster ids is not equal. one={}, other={}",
406
0
                                       _effective_cluster_id, cluster_id),
407
0
                    "cluster id not equal");
408
0
        }
409
0
    }
410
411
0
    return Status::OK();
412
49
}
413
414
859
std::vector<DataDir*> StorageEngine::get_stores(bool include_unused) {
415
859
    std::vector<DataDir*> stores;
416
859
    stores.reserve(_store_map.size());
417
418
859
    std::lock_guard<std::mutex> l(_store_lock);
419
859
    if (include_unused) {
420
152
        for (auto&& [_, store] : _store_map) {
421
151
            stores.push_back(store.get());
422
151
        }
423
707
    } else {
424
778
        for (auto&& [_, store] : _store_map) {
425
778
            if (store->is_used()) {
426
778
                stores.push_back(store.get());
427
778
            }
428
778
        }
429
707
    }
430
859
    return stores;
431
859
}
432
433
Status StorageEngine::get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos,
434
329
                                            bool need_update) {
435
329
    Status res = Status::OK();
436
329
    data_dir_infos->clear();
437
438
329
    MonotonicStopWatch timer;
439
329
    timer.start();
440
441
    // 1. update available capacity of each data dir
442
    // get all root path info and construct a path map.
443
    // path -> DataDirInfo
444
329
    std::map<std::string, DataDirInfo> path_map;
445
329
    {
446
329
        std::lock_guard<std::mutex> l(_store_lock);
447
375
        for (auto& it : _store_map) {
448
375
            if (need_update) {
449
317
                RETURN_IF_ERROR(it.second->update_capacity());
450
317
            }
451
375
            path_map.emplace(it.first, it.second->get_dir_info());
452
375
        }
453
329
    }
454
455
    // 2. get total tablets' size of each data dir
456
329
    size_t tablet_count = 0;
457
329
    _tablet_manager->update_root_path_info(&path_map, &tablet_count);
458
459
    // 3. update metrics in DataDir
460
375
    for (auto& path : path_map) {
461
375
        std::lock_guard<std::mutex> l(_store_lock);
462
375
        auto data_dir = _store_map.find(path.first);
463
375
        DCHECK(data_dir != _store_map.end());
464
375
        data_dir->second->update_local_data_size(path.second.local_used_capacity);
465
375
        data_dir->second->update_remote_data_size(path.second.remote_used_capacity);
466
375
    }
467
468
    // add path info to data_dir_infos
469
375
    for (auto& entry : path_map) {
470
375
        data_dir_infos->emplace_back(entry.second);
471
375
    }
472
473
329
    timer.stop();
474
329
    LOG(INFO) << "get root path info cost: " << timer.elapsed_time() / 1000000
475
329
              << " ms. tablet counter: " << tablet_count;
476
477
329
    return res;
478
329
}
479
480
58
int64_t StorageEngine::get_file_or_directory_size(const std::string& file_path) {
481
58
    if (!std::filesystem::exists(file_path)) {
482
58
        return 0;
483
58
    }
484
0
    if (!std::filesystem::is_directory(file_path)) {
485
0
        return std::filesystem::file_size(file_path);
486
0
    }
487
0
    int64_t sum_size = 0;
488
0
    for (const auto& it : std::filesystem::directory_iterator(file_path)) {
489
0
        sum_size += get_file_or_directory_size(it.path());
490
0
    }
491
0
    return sum_size;
492
0
}
493
494
1.66k
void StorageEngine::_start_disk_stat_monitor() {
495
1.88k
    for (auto& it : _store_map) {
496
1.88k
        it.second->health_check();
497
1.88k
    }
498
499
1.66k
    _update_storage_medium_type_count();
500
501
1.66k
    _exit_if_too_many_disks_are_failed();
502
1.66k
}
503
504
// TODO(lingbin): Should be in EnvPosix?
505
49
Status StorageEngine::_check_file_descriptor_number() {
506
49
    struct rlimit l;
507
49
    int ret = getrlimit(RLIMIT_NOFILE, &l);
508
49
    if (ret != 0) {
509
0
        LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno)
510
0
                     << ", use default configuration instead.";
511
0
        return Status::OK();
512
0
    }
513
49
    if (getenv("SKIP_CHECK_ULIMIT") == nullptr) {
514
49
        LOG(INFO) << "will check 'ulimit' value.";
515
49
    } else if (std::string(getenv("SKIP_CHECK_ULIMIT")) == "true") {
516
0
        LOG(INFO) << "the 'ulimit' value check is skipped"
517
0
                  << ", the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT");
518
0
        return Status::OK();
519
0
    } else {
520
0
        LOG(INFO) << "the SKIP_CHECK_ULIMIT env value is " << getenv("SKIP_CHECK_ULIMIT")
521
0
                  << ", will check ulimit value.";
522
0
    }
523
49
    if (l.rlim_cur < config::min_file_descriptor_number) {
524
0
        LOG(ERROR) << "File descriptor number is less than " << config::min_file_descriptor_number
525
0
                   << ". Please use (ulimit -n) to set a value equal or greater than "
526
0
                   << config::min_file_descriptor_number;
527
0
        return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
528
0
                "file descriptors limit {} is small than {}", l.rlim_cur,
529
0
                config::min_file_descriptor_number);
530
0
    }
531
49
    return Status::OK();
532
49
}
533
534
49
Status StorageEngine::_check_all_root_path_cluster_id() {
535
49
    int32_t cluster_id = -1;
536
60
    for (auto& it : _store_map) {
537
60
        int32_t tmp_cluster_id = it.second->cluster_id();
538
60
        if (it.second->cluster_id_incomplete()) {
539
52
            _is_all_cluster_id_exist = false;
540
52
        } else if (tmp_cluster_id == cluster_id) {
541
            // both have right cluster id, do nothing
542
4
        } else if (cluster_id == -1) {
543
4
            cluster_id = tmp_cluster_id;
544
4
        } else {
545
0
            RETURN_NOT_OK_STATUS_WITH_WARN(
546
0
                    Status::Corruption("multiple cluster ids is not equal. one={}, other={}",
547
0
                                       cluster_id, tmp_cluster_id),
548
0
                    "cluster id not equal");
549
0
        }
550
60
    }
551
552
    // judge and get effective cluster id
553
49
    RETURN_IF_ERROR(_judge_and_update_effective_cluster_id(cluster_id));
554
555
    // write cluster id into cluster_id_path if get effective cluster id success
556
49
    if (_effective_cluster_id != -1 && !_is_all_cluster_id_exist) {
557
0
        RETURN_IF_ERROR(set_cluster_id(_effective_cluster_id));
558
0
    }
559
560
49
    return Status::OK();
561
49
}
562
563
2
Status StorageEngine::set_cluster_id(int32_t cluster_id) {
564
2
    std::lock_guard<std::mutex> l(_store_lock);
565
2
    for (auto& it : _store_map) {
566
2
        RETURN_IF_ERROR(it.second->set_cluster_id(cluster_id));
567
2
    }
568
2
    _effective_cluster_id = cluster_id;
569
2
    _is_all_cluster_id_exist = true;
570
2
    return Status::OK();
571
2
}
572
573
int StorageEngine::_get_and_set_next_disk_index(int64_t partition_id,
574
7.52k
                                                TStorageMedium::type storage_medium) {
575
7.52k
    auto key = CreateTabletRRIdxCache::get_key(partition_id, storage_medium);
576
7.52k
    int curr_index = _create_tablet_idx_lru_cache->get_index(key);
577
    // -1, lru can't find key
578
7.52k
    if (curr_index == -1) {
579
1.45k
        curr_index = std::max(0, _last_use_index[storage_medium] + 1);
580
1.45k
    }
581
7.52k
    _last_use_index[storage_medium] = curr_index;
582
7.52k
    _create_tablet_idx_lru_cache->set_index(key, std::max(0, curr_index + 1));
583
7.52k
    return curr_index;
584
7.52k
}
585
586
void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium,
587
7.52k
                                          std::vector<DirInfo>& dir_infos) {
588
7.52k
    std::vector<double> usages;
589
7.52k
    for (auto& it : _store_map) {
590
7.52k
        DataDir* data_dir = it.second.get();
591
7.52k
        if (data_dir->is_used()) {
592
7.52k
            if ((_available_storage_medium_type_count == 1 ||
593
7.52k
                 data_dir->storage_medium() == storage_medium) &&
594
7.52k
                !data_dir->reach_capacity_limit(0)) {
595
7.52k
                double usage = data_dir->get_usage(0);
596
7.52k
                DirInfo dir_info;
597
7.52k
                dir_info.data_dir = data_dir;
598
7.52k
                dir_info.usage = usage;
599
7.52k
                dir_info.available_level = 0;
600
7.52k
                usages.push_back(usage);
601
7.52k
                dir_infos.push_back(dir_info);
602
7.52k
            }
603
7.52k
        }
604
7.52k
    }
605
606
7.52k
    if (dir_infos.size() <= 1) {
607
7.52k
        return;
608
7.52k
    }
609
610
1
    std::sort(usages.begin(), usages.end());
611
1
    if (usages.back() < 0.7) {
612
1
        return;
613
1
    }
614
615
0
    std::vector<double> level_min_usages;
616
0
    level_min_usages.push_back(usages[0]);
617
0
    for (auto usage : usages) {
618
        // usage < 0.7 consider as one level, give a small skew
619
0
        if (usage < 0.7 - (config::high_disk_avail_level_diff_usages / 2.0)) {
620
0
            continue;
621
0
        }
622
623
        // at high usages,  default 15% is one level
624
        // for example: there disk usages are:   0.66,  0.72,  0.83
625
        // then level_min_usages = [0.66, 0.83], divide disks into 2 levels:  [0.66, 0.72], [0.83]
626
0
        if (usage >= level_min_usages.back() + config::high_disk_avail_level_diff_usages) {
627
0
            level_min_usages.push_back(usage);
628
0
        }
629
0
    }
630
0
    for (auto& dir_info : dir_infos) {
631
0
        double usage = dir_info.usage;
632
0
        for (size_t i = 1; i < level_min_usages.size() && usage >= level_min_usages[i]; i++) {
633
0
            dir_info.available_level++;
634
0
        }
635
636
        // when usage is too high, no matter consider balance now,
637
        // make it a higher level.
638
        // for example, two disks and usages are: 0.85 and 0.92, then let tablets fall on the first disk.
639
        // by default, storage_flood_stage_usage_percent = 90
640
0
        if (usage > config::storage_flood_stage_usage_percent / 100.0) {
641
0
            dir_info.available_level++;
642
0
        }
643
0
    }
644
0
}
645
646
std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(
647
7.52k
        int64_t partition_id, TStorageMedium::type storage_medium) {
648
7.52k
    std::vector<DirInfo> dir_infos;
649
7.52k
    int curr_index = 0;
650
7.52k
    std::vector<DataDir*> stores;
651
7.52k
    {
652
7.52k
        std::lock_guard<std::mutex> l(_store_lock);
653
7.52k
        curr_index = _get_and_set_next_disk_index(partition_id, storage_medium);
654
7.52k
        _get_candidate_stores(storage_medium, dir_infos);
655
7.52k
    }
656
657
7.52k
    std::sort(dir_infos.begin(), dir_infos.end());
658
7.52k
    get_round_robin_stores(curr_index, dir_infos, stores);
659
660
7.52k
    return stores;
661
7.52k
}
662
663
// maintain in stores LOW,MID,HIGH level round robin
664
void get_round_robin_stores(int64_t curr_index, const std::vector<DirInfo>& dir_infos,
665
7.52k
                            std::vector<DataDir*>& stores) {
666
15.0k
    for (size_t i = 0; i < dir_infos.size();) {
667
7.52k
        size_t end = i + 1;
668
7.52k
        while (end < dir_infos.size() &&
669
7.52k
               dir_infos[i].available_level == dir_infos[end].available_level) {
670
1
            end++;
671
1
        }
672
        // data dirs [i, end) have the same tablet size, round robin range [i, end)
673
7.52k
        size_t count = end - i;
674
15.0k
        for (size_t k = 0; k < count; k++) {
675
7.52k
            size_t index = i + ((k + curr_index) % count);
676
7.52k
            stores.push_back(dir_infos[index].data_dir);
677
7.52k
        }
678
7.52k
        i = end;
679
7.52k
    }
680
7.52k
}
681
682
176
DataDir* StorageEngine::get_store(const std::string& path) {
683
    // _store_map is unchanged, no need to lock
684
176
    auto it = _store_map.find(path);
685
176
    if (it == _store_map.end()) {
686
0
        return nullptr;
687
0
    }
688
176
    return it->second.get();
689
176
}
690
691
1.66k
static bool too_many_disks_are_failed(uint32_t unused_num, uint32_t total_num) {
692
1.66k
    return ((total_num == 0) ||
693
1.66k
            (unused_num * 100 / total_num > config::max_percentage_of_error_disk));
694
1.66k
}
695
696
1.66k
void StorageEngine::_exit_if_too_many_disks_are_failed() {
697
1.66k
    uint32_t unused_root_path_num = 0;
698
1.66k
    uint32_t total_root_path_num = 0;
699
700
1.66k
    {
701
        // TODO(yingchun): _store_map is only updated in main and ~StorageEngine, maybe we can remove it?
702
1.66k
        std::lock_guard<std::mutex> l(_store_lock);
703
1.66k
        if (_store_map.empty()) {
704
0
            return;
705
0
        }
706
707
1.88k
        for (auto& it : _store_map) {
708
1.88k
            ++total_root_path_num;
709
1.88k
            if (it.second->is_used()) {
710
1.88k
                continue;
711
1.88k
            }
712
0
            ++unused_root_path_num;
713
0
        }
714
1.66k
    }
715
716
1.66k
    if (too_many_disks_are_failed(unused_root_path_num, total_root_path_num)) {
717
0
        LOG(FATAL) << "meet too many error disks, process exit. "
718
0
                   << "max_ratio_allowed=" << config::max_percentage_of_error_disk << "%"
719
0
                   << ", error_disk_count=" << unused_root_path_num
720
0
                   << ", total_disk_count=" << total_root_path_num;
721
0
        exit(0);
722
0
    }
723
1.66k
}
724
725
392
void StorageEngine::stop() {
726
392
    if (_stopped) {
727
3
        LOG(WARNING) << "Storage engine is stopped twice.";
728
3
        return;
729
3
    }
730
    // trigger the waiting threads
731
389
    notify_listeners();
732
733
389
    {
734
389
        std::lock_guard<std::mutex> l(_store_lock);
735
389
        for (auto& store_pair : _store_map) {
736
55
            store_pair.second->stop_bg_worker();
737
55
        }
738
389
    }
739
740
389
    _stop_background_threads_latch.count_down();
741
389
#define THREAD_JOIN(thread) \
742
4.66k
    if (thread) {           \
743
36
        thread->join();     \
744
36
    }
745
746
389
    THREAD_JOIN(_compaction_tasks_producer_thread);
747
389
    THREAD_JOIN(_binlog_compaction_tasks_producer_thread);
748
389
    THREAD_JOIN(_update_replica_infos_thread);
749
389
    THREAD_JOIN(_unused_rowset_monitor_thread);
750
389
    THREAD_JOIN(_garbage_sweeper_thread);
751
389
    THREAD_JOIN(_disk_stat_monitor_thread);
752
389
    THREAD_JOIN(_cache_clean_thread);
753
389
    THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
754
389
    THREAD_JOIN(_async_publish_thread);
755
389
    THREAD_JOIN(_cold_data_compaction_producer_thread);
756
389
    THREAD_JOIN(_cooldown_tasks_producer_thread);
757
389
    THREAD_JOIN(_check_delete_bitmap_score_thread);
758
389
#undef THREAD_JOIN
759
760
389
#define THREADS_JOIN(threads)            \
761
389
    for (const auto& thread : threads) { \
762
5
        if (thread) {                    \
763
5
            thread->join();              \
764
5
        }                                \
765
5
    }
766
767
389
    THREADS_JOIN(_path_gc_threads);
768
389
#undef THREADS_JOIN
769
770
389
    if (_base_compaction_thread_pool) {
771
9
        _base_compaction_thread_pool->shutdown();
772
9
    }
773
389
    if (_cumu_compaction_thread_pool) {
774
10
        _cumu_compaction_thread_pool->shutdown();
775
10
    }
776
389
    if (_binlog_compaction_thread_pool) {
777
3
        _binlog_compaction_thread_pool->shutdown();
778
3
    }
779
389
    if (_single_replica_compaction_thread_pool) {
780
3
        _single_replica_compaction_thread_pool->shutdown();
781
3
    }
782
783
389
    if (_seg_compaction_thread_pool) {
784
14
        _seg_compaction_thread_pool->shutdown();
785
14
    }
786
389
    if (_tablet_meta_checkpoint_thread_pool) {
787
3
        _tablet_meta_checkpoint_thread_pool->shutdown();
788
3
    }
789
389
    if (_cold_data_compaction_thread_pool) {
790
3
        _cold_data_compaction_thread_pool->shutdown();
791
3
    }
792
793
389
    if (_cooldown_thread_pool) {
794
3
        _cooldown_thread_pool->shutdown();
795
3
    }
796
797
389
    _adaptive_thread_controller.stop();
798
389
    _memtable_flush_executor.reset(nullptr);
799
389
    _calc_delete_bitmap_executor.reset(nullptr);
800
389
    _calc_delete_bitmap_executor_for_load.reset();
801
802
389
    _stopped = true;
803
389
    LOG(INFO) << "Storage engine is stopped.";
804
389
}
805
806
18
void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) {
807
    // clear transaction task may not contains partitions ids, we should get partition id from txn manager.
808
18
    std::vector<int64_t> partition_ids;
809
18
    _txn_manager->get_partition_ids(transaction_id, &partition_ids);
810
18
    clear_transaction_task(transaction_id, partition_ids);
811
18
}
812
813
void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
814
20
                                           const std::vector<TPartitionId>& partition_ids) {
815
20
    LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id;
816
817
20
    for (const TPartitionId& partition_id : partition_ids) {
818
14
        std::map<TabletInfo, RowsetSharedPtr> tablet_infos;
819
14
        _txn_manager->get_txn_related_tablets(transaction_id, partition_id, &tablet_infos);
820
821
        // each tablet
822
272
        for (auto& tablet_info : tablet_infos) {
823
            // should use tablet uid to ensure clean txn correctly
824
272
            TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.first.tablet_id,
825
272
                                                                 tablet_info.first.tablet_uid);
826
            // The tablet may be dropped or altered, leave a INFO log and go on process other tablet
827
272
            if (tablet == nullptr) {
828
0
                LOG(INFO) << "tablet is no longer exist. tablet_id=" << tablet_info.first.tablet_id
829
0
                          << ", tablet_uid=" << tablet_info.first.tablet_uid;
830
0
                continue;
831
0
            }
832
272
            Status s = _txn_manager->delete_txn(partition_id, tablet, transaction_id);
833
272
            if (!s.ok()) {
834
0
                LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id
835
0
                             << ", partition_id=" << partition_id
836
0
                             << ", tablet_id=" << tablet_info.first.tablet_id
837
0
                             << ", status=" << s.to_string();
838
0
            }
839
272
        }
840
14
    }
841
20
    LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id;
842
20
}
843
844
50
Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
845
50
    Status res = Status::OK();
846
847
50
    std::unique_lock<std::mutex> l(_trash_sweep_lock, std::defer_lock);
848
50
    if (!l.try_lock()) {
849
0
        LOG(INFO) << "trash and snapshot sweep is running.";
850
0
        if (ignore_guard) {
851
0
            _need_clean_trash.store(true, std::memory_order_relaxed);
852
0
        }
853
0
        return res;
854
0
    }
855
856
50
    LOG(INFO) << "start trash and snapshot sweep. is_clean=" << ignore_guard;
857
858
50
    const int32_t snapshot_expire = config::snapshot_expire_time_sec;
859
50
    const int32_t trash_expire = config::trash_file_expire_time_sec;
860
    // the guard space should be lower than storage_flood_stage_usage_percent,
861
    // so here we multiply 0.9
862
    // if ignore_guard is true, set guard_space to 0.
863
50
    const double guard_space =
864
50
            ignore_guard ? 0 : config::storage_flood_stage_usage_percent / 100.0 * 0.9;
865
50
    std::vector<DataDirInfo> data_dir_infos;
866
50
    RETURN_NOT_OK_STATUS_WITH_WARN(get_all_data_dir_info(&data_dir_infos, false),
867
50
                                   "failed to get root path stat info when sweep trash.")
868
50
    std::sort(data_dir_infos.begin(), data_dir_infos.end(), DataDirInfoLessAvailability());
869
870
50
    time_t now = time(nullptr); //获取UTC时间
871
50
    tm local_tm_now;
872
50
    local_tm_now.tm_isdst = 0;
873
50
    if (localtime_r(&now, &local_tm_now) == nullptr) {
874
0
        return Status::Error<OS_ERROR>("fail to localtime_r time. time={}", now);
875
0
    }
876
50
    const time_t local_now = mktime(&local_tm_now); //得到当地日历时间
877
878
50
    double tmp_usage = 0.0;
879
58
    for (DataDirInfo& info : data_dir_infos) {
880
58
        LOG(INFO) << "Start to sweep path " << info.path;
881
58
        if (!info.is_used) {
882
0
            continue;
883
0
        }
884
885
58
        double curr_usage =
886
58
                (double)(info.disk_capacity - info.available) / (double)info.disk_capacity;
887
58
        tmp_usage = std::max(tmp_usage, curr_usage);
888
889
58
        Status curr_res = Status::OK();
890
58
        auto snapshot_path = fmt::format("{}/{}", info.path, SNAPSHOT_PREFIX);
891
58
        curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
892
58
        if (!curr_res.ok()) {
893
0
            LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path
894
0
                         << ", err_code=" << curr_res;
895
0
            res = curr_res;
896
0
        }
897
898
58
        auto trash_path = fmt::format("{}/{}", info.path, TRASH_PREFIX);
899
58
        curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 0 : trash_expire);
900
58
        if (!curr_res.ok()) {
901
0
            LOG(WARNING) << "failed to sweep trash. path=" << trash_path
902
0
                         << ", err_code=" << curr_res;
903
0
            res = curr_res;
904
0
        }
905
58
    }
906
907
50
    if (usage != nullptr) {
908
50
        *usage = tmp_usage; // update usage
909
50
    }
910
911
    // clear expire incremental rowset, move deleted tablet to trash
912
50
    RETURN_IF_ERROR(_tablet_manager->start_trash_sweep());
913
914
    // clean rubbish transactions
915
50
    _clean_unused_txns();
916
917
    // clean unused rowset metas in OlapMeta
918
50
    _clean_unused_rowset_metas();
919
920
    // clean unused binlog metas in OlapMeta
921
50
    _clean_unused_binlog_metas();
922
923
    // cleand unused delete bitmap for deleted tablet
924
50
    _clean_unused_delete_bitmap();
925
926
    // cleand unused pending publish info for deleted tablet
927
50
    _clean_unused_pending_publish_info();
928
929
    // clean unused partial update info for finished txns
930
50
    _clean_unused_partial_update_info();
931
932
    // clean unused rowsets in remote storage backends
933
58
    for (auto data_dir : get_stores()) {
934
58
        data_dir->perform_remote_rowset_gc();
935
58
        data_dir->perform_remote_tablet_gc();
936
58
        data_dir->update_trash_capacity();
937
58
    }
938
939
50
    return res;
940
50
}
941
942
50
void StorageEngine::_clean_unused_rowset_metas() {
943
50
    std::vector<RowsetMetaSharedPtr> invalid_rowset_metas;
944
50
    auto clean_rowset_func = [this, &invalid_rowset_metas](TabletUid tablet_uid, RowsetId rowset_id,
945
53.7k
                                                           std::string_view meta_str) -> bool {
946
        // return false will break meta iterator, return true to skip this error
947
53.7k
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
948
53.7k
        bool parsed = rowset_meta->init(meta_str);
949
53.7k
        if (!parsed) {
950
0
            LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
951
0
            rowset_meta->set_rowset_id(rowset_id);
952
0
            rowset_meta->set_tablet_uid(tablet_uid);
953
0
            invalid_rowset_metas.push_back(rowset_meta);
954
0
            return true;
955
0
        }
956
53.7k
        if (rowset_meta->tablet_uid() != tablet_uid) {
957
0
            LOG(WARNING) << "tablet uid is not equal, skip the rowset"
958
0
                         << ", rowset_id=" << rowset_meta->rowset_id()
959
0
                         << ", in_put_tablet_uid=" << tablet_uid
960
0
                         << ", tablet_uid in rowset meta=" << rowset_meta->tablet_uid();
961
0
            invalid_rowset_metas.push_back(rowset_meta);
962
0
            return true;
963
0
        }
964
965
53.7k
        TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id());
966
53.7k
        if (tablet == nullptr) {
967
            // tablet may be dropped
968
            // TODO(cmy): this is better to be a VLOG, because drop table is a very common case.
969
            // leave it as INFO log for observation. Maybe change it in future.
970
2.59k
            LOG(INFO) << "failed to find tablet " << rowset_meta->tablet_id()
971
2.59k
                      << " for rowset: " << rowset_meta->rowset_id() << ", tablet may be dropped";
972
2.59k
            invalid_rowset_metas.push_back(rowset_meta);
973
2.59k
            return true;
974
2.59k
        }
975
51.1k
        if (tablet->tablet_uid() != rowset_meta->tablet_uid()) {
976
            // In this case, we get the tablet using the tablet id recorded in the rowset meta.
977
            // but the uid in the tablet is different from the one recorded in the rowset meta.
978
            // How this happened:
979
            // Replica1 of Tablet A exists on BE1. Because of the clone task, a new replica2 is createed on BE2,
980
            // and then replica1 deleted from BE1. After some time, we created replica again on BE1,
981
            // which will creates a new tablet with the same id but a different uid.
982
            // And in the historical version, when we deleted the replica, we did not delete the corresponding rowset meta,
983
            // thus causing the original rowset meta to remain(with same tablet id but different uid).
984
0
            LOG(WARNING) << "rowset's tablet uid " << rowset_meta->tablet_uid()
985
0
                         << " does not equal to tablet uid: " << tablet->tablet_uid();
986
0
            invalid_rowset_metas.push_back(rowset_meta);
987
0
            return true;
988
0
        }
989
51.1k
        if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
990
51.1k
            (!tablet->rowset_meta_is_useful(rowset_meta)) &&
991
51.1k
            !check_rowset_id_in_unused_rowsets(rowset_id)) {
992
834
            LOG(INFO) << "rowset meta is not used any more, remove it. rowset_id="
993
834
                      << rowset_meta->rowset_id();
994
834
            invalid_rowset_metas.push_back(rowset_meta);
995
834
        }
996
51.1k
        return true;
997
51.1k
    };
998
50
    std::vector<std::pair<RowsetId, RowsetMetaSharedPtr>> invalid_row_binlog_metas;
999
50
    auto clean_row_binlog_rowsets = [this, &invalid_row_binlog_metas](
1000
50
                                            const TabletUid& tablet_uid, RowsetId rowset_id,
1001
50
                                            RowsetId row_binlog_rowset_id,
1002
50
                                            const std::string& meta_str) -> bool {
1003
        // return false will break meta iterator, return true to skip this error
1004
0
        RowsetMetaSharedPtr row_binlog_rowset_meta(new RowsetMeta());
1005
0
        bool parsed = row_binlog_rowset_meta->init(meta_str);
1006
0
        if (!parsed) {
1007
0
            LOG(WARNING) << "parse binlog<row> meta string failed for rowset_id:"
1008
0
                         << row_binlog_rowset_id;
1009
0
            row_binlog_rowset_meta->set_rowset_id(row_binlog_rowset_id);
1010
0
            row_binlog_rowset_meta->set_tablet_uid(tablet_uid);
1011
0
            invalid_row_binlog_metas.emplace_back(rowset_id, row_binlog_rowset_meta);
1012
0
            return true;
1013
0
        }
1014
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(row_binlog_rowset_meta->tablet_id());
1015
0
        if (tablet == nullptr) {
1016
0
            LOG(INFO) << "failed to find tablet " << row_binlog_rowset_meta->tablet_id()
1017
0
                      << " for binlog<row>: " << row_binlog_rowset_meta->rowset_id()
1018
0
                      << ", tablet may be dropped";
1019
0
            invalid_row_binlog_metas.emplace_back(rowset_id, row_binlog_rowset_meta);
1020
0
            return true;
1021
0
        }
1022
0
        if (tablet->tablet_uid() != row_binlog_rowset_meta->tablet_uid()) {
1023
0
            LOG(WARNING) << "binlog<row> meta's tablet uid " << row_binlog_rowset_meta->tablet_uid()
1024
0
                         << " does not equal to tablet uid: " << tablet->tablet_uid();
1025
0
            invalid_row_binlog_metas.emplace_back(rowset_id, row_binlog_rowset_meta);
1026
0
            return true;
1027
0
        }
1028
0
        if (row_binlog_rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
1029
0
            !tablet->rowset_meta_is_useful(row_binlog_rowset_meta) &&
1030
0
            !check_rowset_id_in_unused_rowsets(rowset_id)) {
1031
0
            LOG(INFO) << "binlog<row> meta is not used any more, remove it. rowset_id="
1032
0
                      << row_binlog_rowset_meta->rowset_id();
1033
0
            invalid_row_binlog_metas.emplace_back(rowset_id, row_binlog_rowset_meta);
1034
0
        }
1035
0
        return true;
1036
0
    };
1037
50
    auto data_dirs = get_stores();
1038
58
    for (auto data_dir : data_dirs) {
1039
58
        static_cast<void>(
1040
58
                RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func));
1041
        // 1. delete delete_bitmap
1042
58
        std::set<int64_t> tablets_to_save_meta;
1043
3.42k
        for (auto& rowset_meta : invalid_rowset_metas) {
1044
3.42k
            TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id());
1045
3.42k
            if (tablet && tablet->tablet_meta()->enable_unique_key_merge_on_write()) {
1046
830
                tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(),
1047
830
                                                                   rowset_meta->version());
1048
830
                tablets_to_save_meta.emplace(tablet->tablet_id());
1049
830
            }
1050
3.42k
        }
1051
58
        for (const auto& tablet_id : tablets_to_save_meta) {
1052
36
            auto tablet = _tablet_manager->get_tablet(tablet_id);
1053
36
            if (tablet) {
1054
36
                std::shared_lock rlock(tablet->get_header_lock());
1055
36
                tablet->save_meta();
1056
36
            }
1057
36
        }
1058
        // 2. delete rowset meta
1059
3.42k
        for (auto& rowset_meta : invalid_rowset_metas) {
1060
3.42k
            static_cast<void>(RowsetMetaManager::remove(
1061
3.42k
                    data_dir->get_meta(), rowset_meta->tablet_uid(), rowset_meta->rowset_id()));
1062
3.42k
        }
1063
58
        LOG(INFO) << "remove " << invalid_rowset_metas.size()
1064
58
                  << " invalid rowset meta from dir: " << data_dir->path();
1065
1066
58
        static_cast<void>(RowsetMetaManager::traverse_row_binlog_metas(data_dir->get_meta(),
1067
58
                                                                       clean_row_binlog_rowsets));
1068
58
        for (auto& rs_id_to_meta : invalid_row_binlog_metas) {
1069
0
            static_cast<void>(RowsetMetaManager::remove_row_binlog(
1070
0
                    data_dir->get_meta(), rs_id_to_meta.second->tablet_uid(), rs_id_to_meta.first,
1071
0
                    rs_id_to_meta.second->rowset_id()));
1072
0
        }
1073
58
        LOG(INFO) << "remove " << invalid_row_binlog_metas.size()
1074
58
                  << " invalid binlog<row> meta from dir: " << data_dir->path();
1075
58
        invalid_row_binlog_metas.clear();
1076
58
        invalid_rowset_metas.clear();
1077
58
    }
1078
50
}
1079
1080
50
void StorageEngine::_clean_unused_binlog_metas() {
1081
50
    std::vector<std::string> unused_binlog_key_suffixes;
1082
50
    auto unused_binlog_collector = [this, &unused_binlog_key_suffixes](std::string_view key,
1083
50
                                                                       std::string_view value,
1084
248
                                                                       bool need_check) -> bool {
1085
248
        if (need_check) {
1086
248
            BinlogMetaEntryPB binlog_meta_pb;
1087
248
            if (UNLIKELY(!binlog_meta_pb.ParseFromArray(value.data(),
1088
248
                                                        cast_set<int>(value.size())))) {
1089
0
                LOG(WARNING) << "parse rowset meta string failed for binlog meta key: " << key;
1090
248
            } else if (_tablet_manager->get_tablet(binlog_meta_pb.tablet_id()) == nullptr) {
1091
0
                LOG(INFO) << "failed to find tablet " << binlog_meta_pb.tablet_id()
1092
0
                          << " for binlog rowset: " << binlog_meta_pb.rowset_id()
1093
0
                          << ", tablet may be dropped";
1094
248
            } else {
1095
248
                return false;
1096
248
            }
1097
248
        }
1098
1099
0
        unused_binlog_key_suffixes.emplace_back(key.substr(kBinlogMetaPrefix.size()));
1100
0
        return true;
1101
248
    };
1102
50
    auto data_dirs = get_stores();
1103
58
    for (auto data_dir : data_dirs) {
1104
58
        static_cast<void>(RowsetMetaManager::traverse_binlog_metas(data_dir->get_meta(),
1105
58
                                                                   unused_binlog_collector));
1106
58
        for (const auto& suffix : unused_binlog_key_suffixes) {
1107
0
            static_cast<void>(RowsetMetaManager::remove_binlog(data_dir->get_meta(), suffix));
1108
0
        }
1109
58
        LOG(INFO) << "remove " << unused_binlog_key_suffixes.size()
1110
58
                  << " invalid binlog meta from dir: " << data_dir->path();
1111
58
        unused_binlog_key_suffixes.clear();
1112
58
    }
1113
50
}
1114
1115
50
void StorageEngine::_clean_unused_delete_bitmap() {
1116
50
    std::unordered_set<int64_t> removed_tablets;
1117
50
    auto clean_delete_bitmap_func = [this, &removed_tablets](int64_t tablet_id, int64_t version,
1118
1.04k
                                                             std::string_view val) -> bool {
1119
1.04k
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1120
1.04k
        if (tablet == nullptr) {
1121
36
            if (removed_tablets.insert(tablet_id).second) {
1122
34
                LOG(INFO) << "clean ununsed delete bitmap for deleted tablet, tablet_id: "
1123
34
                          << tablet_id;
1124
34
            }
1125
36
        }
1126
1.04k
        return true;
1127
1.04k
    };
1128
50
    auto data_dirs = get_stores();
1129
58
    for (auto data_dir : data_dirs) {
1130
58
        static_cast<void>(TabletMetaManager::traverse_delete_bitmap(data_dir->get_meta(),
1131
58
                                                                    clean_delete_bitmap_func));
1132
58
        for (auto id : removed_tablets) {
1133
34
            static_cast<void>(
1134
34
                    TabletMetaManager::remove_old_version_delete_bitmap(data_dir, id, INT64_MAX));
1135
34
        }
1136
58
        LOG(INFO) << "removed invalid delete bitmap from dir: " << data_dir->path()
1137
58
                  << ", deleted tablets size: " << removed_tablets.size();
1138
58
        removed_tablets.clear();
1139
58
    }
1140
50
}
1141
1142
50
void StorageEngine::_clean_unused_pending_publish_info() {
1143
50
    std::vector<std::pair<int64_t, int64_t>> removed_infos;
1144
50
    auto clean_pending_publish_info_func = [this, &removed_infos](int64_t tablet_id,
1145
50
                                                                  int64_t publish_version,
1146
50
                                                                  std::string_view info) -> bool {
1147
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1148
0
        if (tablet == nullptr) {
1149
0
            removed_infos.emplace_back(tablet_id, publish_version);
1150
0
        }
1151
0
        return true;
1152
0
    };
1153
50
    auto data_dirs = get_stores();
1154
58
    for (auto data_dir : data_dirs) {
1155
58
        static_cast<void>(TabletMetaManager::traverse_pending_publish(
1156
58
                data_dir->get_meta(), clean_pending_publish_info_func));
1157
58
        for (auto& [tablet_id, publish_version] : removed_infos) {
1158
0
            static_cast<void>(TabletMetaManager::remove_pending_publish_info(data_dir, tablet_id,
1159
0
                                                                             publish_version));
1160
0
        }
1161
58
        LOG(INFO) << "removed invalid pending publish info from dir: " << data_dir->path()
1162
58
                  << ", deleted pending publish info size: " << removed_infos.size();
1163
58
        removed_infos.clear();
1164
58
    }
1165
50
}
1166
1167
50
void StorageEngine::_clean_unused_partial_update_info() {
1168
50
    std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos;
1169
50
    auto unused_partial_update_info_collector =
1170
50
            [this, &remove_infos](int64_t tablet_id, int64_t partition_id, int64_t txn_id,
1171
50
                                  std::string_view value) -> bool {
1172
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1173
0
        if (tablet == nullptr) {
1174
0
            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
1175
0
            return true;
1176
0
        }
1177
0
        TxnState txn_state =
1178
0
                _txn_manager->get_txn_state(partition_id, txn_id, tablet_id, tablet->tablet_uid());
1179
0
        if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED ||
1180
0
            txn_state == TxnState::DELETED) {
1181
0
            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
1182
0
            return true;
1183
0
        }
1184
0
        return true;
1185
0
    };
1186
50
    auto data_dirs = get_stores();
1187
58
    for (auto* data_dir : data_dirs) {
1188
58
        static_cast<void>(RowsetMetaManager::traverse_partial_update_info(
1189
58
                data_dir->get_meta(), unused_partial_update_info_collector));
1190
58
        static_cast<void>(
1191
58
                RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), remove_infos));
1192
58
    }
1193
50
}
1194
1195
0
void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) {
1196
0
    for (auto [tablet_id, version] : gc_tablet_infos) {
1197
0
        LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id,
1198
0
                                 version);
1199
1200
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1201
0
        if (tablet == nullptr) {
1202
0
            LOG(WARNING) << fmt::format("tablet_id: {} not found", tablet_id);
1203
0
            continue;
1204
0
        }
1205
0
        tablet->gc_binlogs(version);
1206
0
    }
1207
0
}
1208
1209
50
void StorageEngine::_clean_unused_txns() {
1210
50
    std::set<TabletInfo> tablet_infos;
1211
50
    _txn_manager->get_all_related_tablets(&tablet_infos);
1212
906
    for (auto& tablet_info : tablet_infos) {
1213
906
        TabletSharedPtr tablet =
1214
906
                _tablet_manager->get_tablet(tablet_info.tablet_id, tablet_info.tablet_uid, true);
1215
906
        if (tablet == nullptr) {
1216
            // TODO(ygl) :  should check if tablet still in meta, it's a improvement
1217
            // case 1: tablet still in meta, just remove from memory
1218
            // case 2: tablet not in meta store, remove rowset from meta
1219
            // currently just remove them from memory
1220
            // nullptr to indicate not remove them from meta store
1221
0
            _txn_manager->force_rollback_tablet_related_txns(nullptr, tablet_info.tablet_id,
1222
0
                                                             tablet_info.tablet_uid);
1223
0
        }
1224
906
    }
1225
50
}
1226
1227
Status StorageEngine::_do_sweep(const std::string& scan_root, const time_t& local_now,
1228
116
                                const int32_t expire) {
1229
116
    Status res = Status::OK();
1230
116
    bool exists = true;
1231
116
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(scan_root, &exists));
1232
116
    if (!exists) {
1233
        // dir not existed. no need to sweep trash.
1234
78
        return res;
1235
78
    }
1236
1237
38
    int curr_sweep_batch_size = 0;
1238
38
    try {
1239
        // Sort pathes by name, that is by delete time.
1240
38
        std::vector<path> sorted_pathes;
1241
38
        std::copy(directory_iterator(scan_root), directory_iterator(),
1242
38
                  std::back_inserter(sorted_pathes));
1243
38
        std::sort(sorted_pathes.begin(), sorted_pathes.end());
1244
38
        for (const auto& sorted_path : sorted_pathes) {
1245
6
            string dir_name = sorted_path.filename().string();
1246
6
            string str_time = dir_name.substr(0, dir_name.find('.'));
1247
6
            tm local_tm_create;
1248
6
            local_tm_create.tm_isdst = 0;
1249
6
            if (strptime(str_time.c_str(), "%Y%m%d%H%M%S", &local_tm_create) == nullptr) {
1250
0
                res = Status::Error<OS_ERROR>("fail to strptime time. time={}", str_time);
1251
0
                continue;
1252
0
            }
1253
1254
6
            int32_t actual_expire = expire;
1255
            // try get timeout in dir name, the old snapshot dir does not contain timeout
1256
            // eg: 20190818221123.3.86400, the 86400 is timeout, in second
1257
6
            size_t pos = dir_name.find('.', str_time.size() + 1);
1258
6
            if (pos != string::npos) {
1259
6
                actual_expire = std::stoi(dir_name.substr(pos + 1));
1260
6
            }
1261
6
            VLOG_TRACE << "get actual expire time " << actual_expire << " of dir: " << dir_name;
1262
1263
6
            string path_name = sorted_path.string();
1264
6
            if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
1265
0
                res = io::global_local_filesystem()->delete_directory(path_name);
1266
0
                LOG(INFO) << "do sweep delete directory " << path_name << " local_now " << local_now
1267
0
                          << "actual_expire " << actual_expire << " res " << res;
1268
0
                if (!res.ok()) {
1269
0
                    continue;
1270
0
                }
1271
1272
0
                curr_sweep_batch_size++;
1273
0
                if (config::garbage_sweep_batch_size > 0 &&
1274
0
                    curr_sweep_batch_size >= config::garbage_sweep_batch_size) {
1275
0
                    curr_sweep_batch_size = 0;
1276
0
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
1277
0
                }
1278
6
            } else {
1279
                // Because files are ordered by filename, i.e. by create time, so all the left files are not expired.
1280
6
                break;
1281
6
            }
1282
6
        }
1283
38
    } catch (...) {
1284
0
        res = Status::Error<IO_ERROR>("Exception occur when scan directory. path_desc={}",
1285
0
                                      scan_root);
1286
0
    }
1287
1288
38
    return res;
1289
38
}
1290
1291
// invalid rowset type config will return ALPHA_ROWSET for system to run smoothly
1292
49
void StorageEngine::_parse_default_rowset_type() {
1293
49
    std::string default_rowset_type_config = config::default_rowset_type;
1294
49
    boost::to_upper(default_rowset_type_config);
1295
49
    if (default_rowset_type_config == "BETA") {
1296
49
        _default_rowset_type = BETA_ROWSET;
1297
49
    } else if (default_rowset_type_config == "ALPHA") {
1298
0
        _default_rowset_type = ALPHA_ROWSET;
1299
0
        LOG(WARNING) << "default_rowset_type in be.conf should be set to beta, alpha is not "
1300
0
                        "supported any more";
1301
0
    } else {
1302
0
        LOG(FATAL) << "unknown value " << default_rowset_type_config
1303
0
                   << " in default_rowset_type in be.conf";
1304
0
    }
1305
49
}
1306
1307
283
void StorageEngine::start_delete_unused_rowset() {
1308
283
    DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block", DBUG_BLOCK);
1309
283
    LOG(INFO) << "start to delete unused rowset, size: " << _unused_rowsets.size()
1310
283
              << ", unused delete bitmap size: " << _unused_delete_bitmap.size();
1311
283
    std::vector<RowsetSharedPtr> unused_rowsets_copy;
1312
283
    unused_rowsets_copy.reserve(_unused_rowsets.size());
1313
283
    auto due_to_use_count = 0;
1314
283
    auto due_to_not_delete_file = 0;
1315
283
    auto due_to_delayed_expired_ts = 0;
1316
283
    std::set<int64_t> tablets_to_save_meta;
1317
283
    {
1318
283
        std::lock_guard<std::mutex> lock(_gc_mutex);
1319
31.3k
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
1320
31.0k
            auto&& rs = it->second;
1321
31.0k
            if (rs.use_count() == 1 && rs->need_delete_file()) {
1322
                // remote rowset data will be reclaimed by `remove_unused_remote_files`
1323
31.0k
                if (rs->is_local()) {
1324
31.0k
                    unused_rowsets_copy.push_back(std::move(rs));
1325
31.0k
                }
1326
31.0k
                it = _unused_rowsets.erase(it);
1327
31.0k
            } else {
1328
0
                if (rs.use_count() != 1) {
1329
0
                    ++due_to_use_count;
1330
0
                } else if (!rs->need_delete_file()) {
1331
0
                    ++due_to_not_delete_file;
1332
0
                } else {
1333
0
                    ++due_to_delayed_expired_ts;
1334
0
                }
1335
0
                ++it;
1336
0
            }
1337
31.0k
        }
1338
        // check remove delete bitmaps
1339
283
        for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
1340
0
            auto tablet_id = std::get<0>(*it);
1341
0
            auto tablet = _tablet_manager->get_tablet(tablet_id);
1342
0
            if (tablet == nullptr) {
1343
0
                it = _unused_delete_bitmap.erase(it);
1344
0
                continue;
1345
0
            }
1346
0
            auto& rowset_ids = std::get<1>(*it);
1347
0
            auto& key_ranges = std::get<2>(*it);
1348
0
            bool find_unused_rowset = false;
1349
0
            for (const auto& rowset_id : rowset_ids) {
1350
0
                if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
1351
0
                    VLOG_DEBUG << "can not remove pre rowset delete bitmap because rowset is in use"
1352
0
                               << ", tablet_id=" << tablet_id
1353
0
                               << ", rowset_id=" << rowset_id.to_string();
1354
0
                    find_unused_rowset = true;
1355
0
                    break;
1356
0
                }
1357
0
            }
1358
0
            if (find_unused_rowset) {
1359
0
                ++it;
1360
0
                continue;
1361
0
            }
1362
0
            tablet->tablet_meta()->delete_bitmap().remove(key_ranges);
1363
0
            tablets_to_save_meta.emplace(tablet_id);
1364
0
            it = _unused_delete_bitmap.erase(it);
1365
0
        }
1366
283
    }
1367
283
    LOG(INFO) << "collected " << unused_rowsets_copy.size() << " unused rowsets to remove, skipped "
1368
283
              << due_to_use_count << " rowsets due to use count > 1, skipped "
1369
283
              << due_to_not_delete_file << " rowsets due to don't need to delete file, skipped "
1370
283
              << due_to_delayed_expired_ts << " rowsets due to delayed expired timestamp. left "
1371
283
              << _unused_delete_bitmap.size() << " unused delete bitmap.";
1372
31.0k
    for (auto&& rs : unused_rowsets_copy) {
1373
31.0k
        VLOG_NOTICE << "start to remove rowset:" << rs->rowset_id()
1374
0
                    << ", version:" << rs->version();
1375
        // delete delete_bitmap of unused rowsets
1376
31.0k
        if (auto tablet = _tablet_manager->get_tablet(rs->rowset_meta()->tablet_id());
1377
31.0k
            tablet && tablet->enable_unique_key_merge_on_write()) {
1378
20.9k
            tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
1379
20.9k
            tablets_to_save_meta.emplace(tablet->tablet_id());
1380
20.9k
        }
1381
31.0k
        Status status = rs->remove();
1382
31.0k
        unused_rowsets_counter << -1;
1383
31.0k
        VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished. status:" << status;
1384
31.0k
    }
1385
616
    for (const auto& tablet_id : tablets_to_save_meta) {
1386
616
        auto tablet = _tablet_manager->get_tablet(tablet_id);
1387
616
        if (tablet) {
1388
616
            std::shared_lock rlock(tablet->get_header_lock());
1389
616
            tablet->save_meta();
1390
616
        }
1391
616
    }
1392
283
    LOG(INFO) << "removed all collected unused rowsets";
1393
283
}
1394
1395
31.1k
void StorageEngine::add_unused_rowset(RowsetSharedPtr rowset) {
1396
31.1k
    if (rowset == nullptr) {
1397
5
        return;
1398
5
    }
1399
31.1k
    VLOG_NOTICE << "add unused rowset, rowset id:" << rowset->rowset_id()
1400
40
                << ", version:" << rowset->version();
1401
31.1k
    std::lock_guard<std::mutex> lock(_gc_mutex);
1402
31.1k
    auto it = _unused_rowsets.find(rowset->rowset_id());
1403
31.1k
    if (it == _unused_rowsets.end()) {
1404
31.1k
        rowset->set_need_delete_file();
1405
31.1k
        rowset->close();
1406
31.1k
        _unused_rowsets[rowset->rowset_id()] = std::move(rowset);
1407
31.1k
        unused_rowsets_counter << 1;
1408
31.1k
    }
1409
31.1k
}
1410
1411
void StorageEngine::add_unused_delete_bitmap_key_ranges(int64_t tablet_id,
1412
                                                        const std::vector<RowsetId>& rowsets,
1413
0
                                                        const DeleteBitmapKeyRanges& key_ranges) {
1414
0
    VLOG_NOTICE << "add unused delete bitmap key ranges, tablet id:" << tablet_id;
1415
0
    std::lock_guard<std::mutex> lock(_gc_mutex);
1416
0
    _unused_delete_bitmap.push_back(std::make_tuple(tablet_id, rowsets, key_ranges));
1417
0
}
1418
1419
// TODO(zc): refactor this funciton
1420
7.52k
Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile) {
1421
    // Get all available stores, use ref_root_path if the caller specified
1422
7.52k
    std::vector<DataDir*> stores;
1423
7.52k
    {
1424
7.52k
        SCOPED_TIMER(ADD_TIMER(profile, "GetStores"));
1425
7.52k
        stores = get_stores_for_create_tablet(request.partition_id, request.storage_medium);
1426
7.52k
    }
1427
7.52k
    if (stores.empty()) {
1428
0
        return Status::Error<CE_CMD_PARAMS_ERROR>(
1429
0
                "there is no available disk that can be used to create tablet.");
1430
0
    }
1431
7.52k
    return _tablet_manager->create_tablet(request, stores, profile);
1432
7.52k
}
1433
1434
Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats,
1435
477k
                                                 bool force_use_only_cached, bool cache_on_miss) {
1436
477k
    BaseTabletSPtr tablet;
1437
477k
    std::string err;
1438
477k
    tablet = _tablet_manager->get_tablet(tablet_id, true, &err);
1439
477k
    if (tablet == nullptr) {
1440
43
        return unexpected(
1441
43
                Status::InternalError("failed to get tablet: {}, reason: {}", tablet_id, err));
1442
43
    }
1443
477k
    return tablet;
1444
477k
}
1445
1446
Status StorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
1447
42
                                      bool force_use_only_cached) {
1448
42
    if (tablet_meta == nullptr) {
1449
0
        return Status::InvalidArgument("tablet_meta output is null");
1450
0
    }
1451
1452
42
    auto res = get_tablet(tablet_id, nullptr, force_use_only_cached, true);
1453
42
    if (!res.has_value()) {
1454
42
        return res.error();
1455
42
    }
1456
1457
0
    *tablet_meta = res.value()->tablet_meta();
1458
0
    return Status::OK();
1459
42
}
1460
1461
Status StorageEngine::obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash,
1462
                                        std::string* shard_path, DataDir** store,
1463
0
                                        int64_t partition_id) {
1464
0
    LOG(INFO) << "begin to process obtain root path. storage_medium=" << storage_medium;
1465
1466
0
    if (shard_path == nullptr) {
1467
0
        return Status::Error<CE_CMD_PARAMS_ERROR>(
1468
0
                "invalid output parameter which is null pointer.");
1469
0
    }
1470
1471
0
    auto stores = get_stores_for_create_tablet(partition_id, storage_medium);
1472
0
    if (stores.empty()) {
1473
0
        return Status::Error<NO_AVAILABLE_ROOT_PATH>(
1474
0
                "no available disk can be used to create tablet.");
1475
0
    }
1476
1477
0
    *store = nullptr;
1478
0
    if (path_hash != -1) {
1479
0
        for (auto data_dir : stores) {
1480
0
            if (data_dir->path_hash() == path_hash) {
1481
0
                *store = data_dir;
1482
0
                break;
1483
0
            }
1484
0
        }
1485
0
    }
1486
0
    if (*store == nullptr) {
1487
0
        *store = stores[0];
1488
0
    }
1489
1490
0
    uint64_t shard = (*store)->get_shard();
1491
1492
0
    std::stringstream root_path_stream;
1493
0
    root_path_stream << (*store)->path() << "/" << DATA_PREFIX << "/" << shard;
1494
0
    *shard_path = root_path_stream.str();
1495
1496
0
    LOG(INFO) << "success to process obtain root path. path=" << *shard_path;
1497
0
    return Status::OK();
1498
0
}
1499
1500
Status StorageEngine::load_header(const string& shard_path, const TCloneReq& request,
1501
0
                                  bool restore) {
1502
0
    LOG(INFO) << "begin to process load headers."
1503
0
              << "tablet_id=" << request.tablet_id << ", schema_hash=" << request.schema_hash;
1504
0
    Status res = Status::OK();
1505
1506
0
    DataDir* store = nullptr;
1507
0
    {
1508
        // TODO(zc)
1509
0
        try {
1510
0
            auto store_path =
1511
0
                    std::filesystem::path(shard_path).parent_path().parent_path().string();
1512
0
            store = get_store(store_path);
1513
0
            if (store == nullptr) {
1514
0
                return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path);
1515
0
            }
1516
0
        } catch (...) {
1517
0
            return Status::Error<INVALID_ROOT_PATH>("invalid shard path, path={}", shard_path);
1518
0
        }
1519
0
    }
1520
1521
0
    std::stringstream schema_hash_path_stream;
1522
0
    schema_hash_path_stream << shard_path << "/" << request.tablet_id << "/" << request.schema_hash;
1523
    // not surely, reload and restore tablet action call this api
1524
    // reset tablet uid here
1525
1526
0
    string header_path = TabletMeta::construct_header_file_path(schema_hash_path_stream.str(),
1527
0
                                                                request.tablet_id);
1528
0
    res = _tablet_manager->load_tablet_from_dir(store, request.tablet_id, request.schema_hash,
1529
0
                                                schema_hash_path_stream.str(), false, restore);
1530
0
    if (!res.ok()) {
1531
0
        LOG(WARNING) << "fail to process load headers. res=" << res;
1532
0
        return res;
1533
0
    }
1534
1535
0
    LOG(INFO) << "success to process load headers.";
1536
0
    return res;
1537
0
}
1538
1539
29
void BaseStorageEngine::register_report_listener(ReportWorker* listener) {
1540
29
    std::lock_guard<std::mutex> l(_report_mtx);
1541
29
    if (std::find(_report_listeners.begin(), _report_listeners.end(), listener) !=
1542
29
        _report_listeners.end()) [[unlikely]] {
1543
0
        return;
1544
0
    }
1545
29
    _report_listeners.push_back(listener);
1546
29
}
1547
1548
13
void BaseStorageEngine::deregister_report_listener(ReportWorker* listener) {
1549
13
    std::lock_guard<std::mutex> l(_report_mtx);
1550
13
    if (auto it = std::find(_report_listeners.begin(), _report_listeners.end(), listener);
1551
13
        it != _report_listeners.end()) {
1552
13
        _report_listeners.erase(it);
1553
13
    }
1554
13
}
1555
1556
403
void BaseStorageEngine::notify_listeners() {
1557
403
    std::lock_guard<std::mutex> l(_report_mtx);
1558
403
    for (auto& listener : _report_listeners) {
1559
56
        listener->notify();
1560
56
    }
1561
403
}
1562
1563
2
bool BaseStorageEngine::notify_listener(std::string_view name) {
1564
2
    bool found = false;
1565
2
    std::lock_guard<std::mutex> l(_report_mtx);
1566
5
    for (auto& listener : _report_listeners) {
1567
5
        if (listener->name() == name) {
1568
2
            listener->notify();
1569
2
            found = true;
1570
2
        }
1571
5
    }
1572
2
    return found;
1573
2
}
1574
1575
7
void BaseStorageEngine::_evict_quring_rowset_thread_callback() {
1576
7
    int32_t interval = config::quering_rowsets_evict_interval;
1577
402
    do {
1578
402
        _evict_querying_rowset();
1579
402
        interval = config::quering_rowsets_evict_interval;
1580
402
        if (interval <= 0) {
1581
0
            LOG(WARNING) << "quering_rowsets_evict_interval config is illegal: " << interval
1582
0
                         << ", force set to 1";
1583
0
            interval = 1;
1584
0
        }
1585
402
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
1586
7
}
1587
1588
// check whether any unused rowsets's id equal to rowset_id
1589
2.99k
bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) {
1590
2.99k
    std::lock_guard<std::mutex> lock(_gc_mutex);
1591
2.99k
    return _unused_rowsets.contains(rowset_id);
1592
2.99k
}
1593
1594
3.63k
PendingRowsetGuard StorageEngine::add_pending_rowset(const RowsetWriterContext& ctx) {
1595
3.63k
    if (ctx.is_local_rowset()) {
1596
3.63k
        return _pending_local_rowsets.add(ctx.rowset_id);
1597
3.63k
    }
1598
0
    return _pending_remote_rowsets.add(ctx.rowset_id);
1599
3.63k
}
1600
1601
bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica,
1602
0
                                          std::string* token) {
1603
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1604
0
    if (tablet == nullptr) {
1605
0
        LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
1606
0
        return false;
1607
0
    }
1608
0
    std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
1609
0
    if (_peer_replica_infos.contains(tablet_id) &&
1610
0
        _peer_replica_infos[tablet_id].replica_id != tablet->replica_id()) {
1611
0
        *replica = _peer_replica_infos[tablet_id];
1612
0
        *token = _token;
1613
0
        return true;
1614
0
    }
1615
0
    return false;
1616
0
}
1617
1618
0
bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends) {
1619
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1620
0
    if (tablet == nullptr) {
1621
0
        LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
1622
0
        return false;
1623
0
    }
1624
0
    int64_t cur_time = UnixMillis();
1625
0
    if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) {
1626
0
        LOG_WARNING("failed to get peers replica backens.")
1627
0
                .tag("tablet_id", tablet_id)
1628
0
                .tag("last time", _last_get_peers_replica_backends_time_ms)
1629
0
                .tag("cur time", cur_time);
1630
0
        return false;
1631
0
    }
1632
0
    LOG_INFO("start get peers replica backends info.").tag("tablet id", tablet_id);
1633
0
    ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
1634
0
    if (cluster_info == nullptr) {
1635
0
        LOG(WARNING) << "Have not get FE Master heartbeat yet";
1636
0
        return false;
1637
0
    }
1638
0
    TNetworkAddress master_addr = cluster_info->master_fe_addr;
1639
0
    if (master_addr.hostname.empty() || master_addr.port == 0) {
1640
0
        LOG(WARNING) << "Have not get FE Master heartbeat yet";
1641
0
        return false;
1642
0
    }
1643
0
    TGetTabletReplicaInfosRequest request;
1644
0
    TGetTabletReplicaInfosResult result;
1645
0
    request.tablet_ids.emplace_back(tablet_id);
1646
0
    Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
1647
0
            master_addr.hostname, master_addr.port,
1648
0
            [&request, &result](FrontendServiceConnection& client) {
1649
0
                client->getTabletReplicaInfos(result, request);
1650
0
            });
1651
1652
0
    if (!rpc_st.ok()) {
1653
0
        LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, "
1654
0
                        "tablet id: "
1655
0
                     << tablet_id;
1656
0
        return false;
1657
0
    }
1658
0
    std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
1659
0
    if (result.tablet_replica_infos.contains(tablet_id)) {
1660
0
        std::vector<TReplicaInfo> reps = result.tablet_replica_infos[tablet_id];
1661
0
        if (reps.empty()) [[unlikely]] {
1662
0
            VLOG_DEBUG << "get_peers_replica_backends reps is empty, maybe this tablet is in "
1663
0
                          "schema change. Go to FE to see more info. Tablet id: "
1664
0
                       << tablet_id;
1665
0
        }
1666
0
        for (const auto& rep : reps) {
1667
0
            if (rep.replica_id != tablet->replica_id()) {
1668
0
                TBackend backend;
1669
0
                backend.__set_host(rep.host);
1670
0
                backend.__set_be_port(rep.be_port);
1671
0
                backend.__set_http_port(rep.http_port);
1672
0
                backend.__set_brpc_port(rep.brpc_port);
1673
0
                if (rep.__isset.is_alive) {
1674
0
                    backend.__set_is_alive(rep.is_alive);
1675
0
                }
1676
0
                if (rep.__isset.backend_id) {
1677
0
                    backend.__set_id(rep.backend_id);
1678
0
                }
1679
0
                backends->emplace_back(backend);
1680
0
                std::stringstream backend_string;
1681
0
                backend.printTo(backend_string);
1682
0
                LOG_INFO("get 1 peer replica backend info.")
1683
0
                        .tag("tablet id", tablet_id)
1684
0
                        .tag("backend info", backend_string.str());
1685
0
            }
1686
0
        }
1687
0
        _last_get_peers_replica_backends_time_ms = UnixMillis();
1688
0
        LOG_INFO("succeed get peers replica backends info.")
1689
0
                .tag("tablet id", tablet_id)
1690
0
                .tag("replica num", backends->size());
1691
0
        return true;
1692
0
    }
1693
0
    return false;
1694
0
}
1695
1696
0
bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) {
1697
#ifdef BE_TEST
1698
    if (tablet_id % 2 == 0) {
1699
        return true;
1700
    }
1701
    return false;
1702
#endif
1703
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1704
0
    if (tablet == nullptr) {
1705
0
        LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
1706
0
        return false;
1707
0
    }
1708
0
    std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
1709
0
    if (_peer_replica_infos.contains(tablet_id)) {
1710
0
        return _peer_replica_infos[tablet_id].replica_id != tablet->replica_id();
1711
0
    }
1712
0
    return false;
1713
0
}
1714
1715
// Return json:
1716
// {
1717
//   "CumulativeCompaction": {
1718
//          "/home/disk1" : [10001, 10002],
1719
//          "/home/disk2" : [10003]
1720
//   },
1721
//   "BaseCompaction": {
1722
//          "/home/disk1" : [10001, 10002],
1723
//          "/home/disk2" : [10003]
1724
//   }
1725
// }
1726
1
void StorageEngine::get_compaction_status_json(std::string* result) {
1727
1
    _compaction_submit_registry.jsonfy_compaction_status(result);
1728
1
}
1729
1730
0
void BaseStorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
1731
0
    std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
1732
0
    _querying_rowsets.emplace(rs->rowset_id(), rs);
1733
0
}
1734
1735
0
RowsetSharedPtr BaseStorageEngine::get_quering_rowset(RowsetId rs_id) {
1736
0
    std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
1737
0
    auto it = _querying_rowsets.find(rs_id);
1738
0
    if (it != _querying_rowsets.end()) {
1739
0
        return it->second;
1740
0
    }
1741
0
    return nullptr;
1742
0
}
1743
1744
402
void BaseStorageEngine::_evict_querying_rowset() {
1745
402
    {
1746
402
        std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
1747
402
        for (auto it = _querying_rowsets.begin(); it != _querying_rowsets.end();) {
1748
0
            uint64_t now = UnixSeconds();
1749
            // We delay the GC time of this rowset since it's maybe still needed, see #20732
1750
0
            if (now > it->second->delayed_expired_timestamp()) {
1751
0
                it = _querying_rowsets.erase(it);
1752
0
            } else {
1753
0
                ++it;
1754
0
            }
1755
0
        }
1756
402
    }
1757
1758
402
    uint64_t now = UnixSeconds();
1759
402
    ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now);
1760
402
}
1761
1762
3
bool BaseStorageEngine::_should_delay_large_task() {
1763
3
    DCHECK_GE(_cumu_compaction_thread_pool->max_threads(),
1764
3
              _cumu_compaction_thread_pool_used_threads);
1765
3
    DCHECK_GE(_cumu_compaction_thread_pool_small_tasks_running, 0);
1766
    // Case 1: Multiple threads available => accept large task
1767
3
    if (_cumu_compaction_thread_pool->max_threads() - _cumu_compaction_thread_pool_used_threads >
1768
3
        0) {
1769
1
        return false; // No delay needed
1770
1
    }
1771
    // Case 2: Only one thread left => accept large task only if another small task is already running
1772
2
    if (_cumu_compaction_thread_pool_small_tasks_running > 0) {
1773
1
        return false; // No delay needed
1774
1
    }
1775
    // Case 3: Only one thread left, this is a large task, and no small tasks are running
1776
    // Delay this task to reserve capacity for potential small tasks
1777
1
    return true; // Delay this large task
1778
2
}
1779
1780
5
bool StorageEngine::add_broken_path(std::string path) {
1781
5
    std::lock_guard<std::mutex> lock(_broken_paths_mutex);
1782
5
    auto success = _broken_paths.emplace(path).second;
1783
5
    if (success) {
1784
4
        static_cast<void>(_persist_broken_paths());
1785
4
    }
1786
5
    return success;
1787
5
}
1788
1789
3
bool StorageEngine::remove_broken_path(std::string path) {
1790
3
    std::lock_guard<std::mutex> lock(_broken_paths_mutex);
1791
3
    auto count = _broken_paths.erase(path);
1792
3
    if (count > 0) {
1793
3
        static_cast<void>(_persist_broken_paths());
1794
3
    }
1795
3
    return count > 0;
1796
3
}
1797
1798
7
Status StorageEngine::_persist_broken_paths() {
1799
7
    std::string config_value;
1800
7
    for (const std::string& path : _broken_paths) {
1801
6
        config_value += path + ";";
1802
6
    }
1803
1804
7
    if (config_value.length() > 0) {
1805
5
        auto st = config::set_config("broken_storage_path", config_value, true);
1806
5
        LOG(INFO) << "persist broken_storage_path " << config_value << st;
1807
5
        return st;
1808
5
    }
1809
1810
2
    return Status::OK();
1811
7
}
1812
1813
0
Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) {
1814
0
    std::vector<TBackend> backends;
1815
0
    if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) {
1816
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1817
0
                "get_peers_replica_backends failed.");
1818
0
    }
1819
0
    TAgentTaskRequest task;
1820
0
    TCloneReq req;
1821
0
    req.__set_tablet_id(tablet->tablet_id());
1822
0
    req.__set_schema_hash(tablet->schema_hash());
1823
0
    req.__set_src_backends(backends);
1824
0
    req.__set_version(version);
1825
0
    req.__set_replica_id(tablet->replica_id());
1826
0
    req.__set_partition_id(tablet->partition_id());
1827
0
    req.__set_table_id(tablet->table_id());
1828
0
    task.__set_task_type(TTaskType::CLONE);
1829
0
    task.__set_clone_req(req);
1830
0
    task.__set_priority(TPriority::HIGH);
1831
0
    task.__set_signature(tablet->tablet_id());
1832
0
    LOG_INFO("BE start to submit missing rowset clone task.")
1833
0
            .tag("tablet_id", tablet->tablet_id())
1834
0
            .tag("version", version)
1835
0
            .tag("replica_id", tablet->replica_id())
1836
0
            .tag("partition_id", tablet->partition_id())
1837
0
            .tag("table_id", tablet->table_id());
1838
0
    RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get())
1839
0
                            ->submit_high_prior_and_cancel_low(task));
1840
0
    return Status::OK();
1841
0
}
1842
1843
7.52k
int CreateTabletRRIdxCache::get_index(const std::string& key) {
1844
7.52k
    auto* lru_handle = lookup(key);
1845
7.52k
    if (lru_handle) {
1846
6.07k
        Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
1847
6.07k
        auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
1848
6.07k
        VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx;
1849
6.07k
        return value->idx;
1850
6.07k
    }
1851
1.45k
    return -1;
1852
7.52k
}
1853
1854
7.52k
void CreateTabletRRIdxCache::set_index(const std::string& key, int next_idx) {
1855
7.52k
    assert(next_idx >= 0);
1856
7.52k
    auto* value = new CacheValue;
1857
7.52k
    value->idx = next_idx;
1858
7.52k
    auto* lru_handle = insert(key, value, 1, sizeof(int), CachePriority::NORMAL);
1859
7.52k
    release(lru_handle);
1860
7.52k
}
1861
} // namespace doris