Coverage Report

Created: 2024-11-21 14:46

/root/doris/be/src/olap/data_dir.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/data_dir.h"
19
20
#include <fmt/core.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <stdio.h>
25
26
#include <atomic>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <chrono> // IWYU pragma: keep
29
#include <cstddef>
30
#include <filesystem>
31
#include <memory>
32
#include <new>
33
#include <roaring/roaring.hh>
34
#include <set>
35
#include <sstream>
36
#include <string>
37
#include <thread>
38
#include <utility>
39
40
#include "common/config.h"
41
#include "common/logging.h"
42
#include "gutil/strings/substitute.h"
43
#include "io/fs/file_reader.h"
44
#include "io/fs/file_writer.h"
45
#include "io/fs/local_file_system.h"
46
#include "io/fs/path.h"
47
#include "io/fs/remote_file_system.h"
48
#include "olap/delete_handler.h"
49
#include "olap/olap_common.h"
50
#include "olap/olap_define.h"
51
#include "olap/olap_meta.h"
52
#include "olap/rowset/beta_rowset.h"
53
#include "olap/rowset/pending_rowset_helper.h"
54
#include "olap/rowset/rowset.h"
55
#include "olap/rowset/rowset_id_generator.h"
56
#include "olap/rowset/rowset_meta.h"
57
#include "olap/rowset/rowset_meta_manager.h"
58
#include "olap/storage_engine.h"
59
#include "olap/storage_policy.h"
60
#include "olap/tablet.h"
61
#include "olap/tablet_manager.h"
62
#include "olap/tablet_meta_manager.h"
63
#include "olap/txn_manager.h"
64
#include "olap/utils.h" // for check_dir_existed
65
#include "service/backend_options.h"
66
#include "util/doris_metrics.h"
67
#include "util/string_util.h"
68
#include "util/uid_util.h"
69
70
using strings::Substitute;
71
72
namespace doris {
73
using namespace ErrorCode;
74
75
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_total_capacity, MetricUnit::BYTES);
76
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_avail_capacity, MetricUnit::BYTES);
77
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_local_used_capacity, MetricUnit::BYTES);
78
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_remote_used_capacity, MetricUnit::BYTES);
79
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_trash_used_capacity, MetricUnit::BYTES);
80
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_state, MetricUnit::BYTES);
81
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_score, MetricUnit::NOUNIT);
82
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_num, MetricUnit::NOUNIT);
83
84
DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
85
                 TStorageMedium::type storage_medium, TabletManager* tablet_manager,
86
                 TxnManager* txn_manager)
87
        : _path(path),
88
          _fs(io::LocalFileSystem::create(path)),
89
          _available_bytes(0),
90
          _disk_capacity_bytes(0),
91
          _trash_used_bytes(0),
92
          _storage_medium(storage_medium),
93
          _is_used(false),
94
          _tablet_manager(tablet_manager),
95
          _txn_manager(txn_manager),
96
          _cluster_id(-1),
97
          _cluster_id_incomplete(false),
98
          _to_be_deleted(false),
99
          _current_shard(0),
100
125
          _meta(nullptr) {
101
125
    _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
102
125
            std::string("data_dir.") + path, {{"path", path}});
103
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity);
104
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
105
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_local_used_capacity);
106
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_remote_used_capacity);
107
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_trash_used_capacity);
108
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
109
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
110
125
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
111
125
}
112
113
125
DataDir::~DataDir() {
114
125
    DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity);
115
125
    delete _meta;
116
125
}
117
118
116
Status DataDir::init(bool init_meta) {
119
116
    bool exists = false;
120
116
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
121
116
    if (!exists) {
122
10
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
123
10
                                       "check file exist failed");
124
0
    }
125
126
106
    RETURN_NOT_OK_STATUS_WITH_WARN(update_capacity(), "update_capacity failed");
127
106
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
128
106
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity_and_create_shards(),
129
106
                                   "_init_capacity_and_create_shards failed");
130
106
    if (init_meta) {
131
106
        RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
132
106
    }
133
134
106
    _is_used = true;
135
106
    return Status::OK();
136
106
}
137
138
34
void DataDir::stop_bg_worker() {
139
34
    _stop_bg_worker = true;
140
34
}
141
142
106
Status DataDir::_init_cluster_id() {
143
106
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
144
106
    RETURN_IF_ERROR(read_cluster_id(cluster_id_path, &_cluster_id));
145
106
    if (_cluster_id == -1) {
146
106
        _cluster_id_incomplete = true;
147
106
    }
148
106
    return Status::OK();
149
106
}
150
151
106
Status DataDir::read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id) {
152
106
    bool exists = false;
153
106
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
154
106
    *cluster_id = -1;
155
106
    if (exists) {
156
0
        io::FileReaderSPtr reader;
157
0
        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
158
0
        size_t fsize = reader->size();
159
0
        if (fsize > 0) {
160
0
            std::string content;
161
0
            content.resize(fsize, '\0');
162
0
            size_t bytes_read = 0;
163
0
            RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
164
0
            DCHECK_EQ(fsize, bytes_read);
165
0
            *cluster_id = std::stoi(content);
166
0
        }
167
0
    }
168
106
    return Status::OK();
169
106
}
170
171
106
Status DataDir::_init_capacity_and_create_shards() {
172
106
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
173
106
                                                                  &_available_bytes));
174
106
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
175
106
    bool exists = false;
176
106
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(data_path, &exists));
177
106
    if (!exists) {
178
56
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(data_path));
179
56
    }
180
108k
    for (int i = 0; i < MAX_SHARD_NUM; ++i) {
181
108k
        auto shard_path = fmt::format("{}/{}", data_path, i);
182
108k
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(shard_path, &exists));
183
108k
        if (!exists) {
184
57.3k
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path));
185
57.3k
        }
186
108k
    }
187
188
106
    return Status::OK();
189
106
}
190
191
107
Status DataDir::_init_meta() {
192
    // init path hash
193
107
    _path_hash = hash_of_path(BackendOptions::get_localhost(), _path);
194
107
    LOG(INFO) << "path: " << _path << ", hash: " << _path_hash;
195
196
    // init meta
197
107
    _meta = new (std::nothrow) OlapMeta(_path);
198
107
    if (_meta == nullptr) {
199
0
        RETURN_NOT_OK_STATUS_WITH_WARN(
200
0
                Status::MemoryAllocFailed("allocate memory for OlapMeta failed"),
201
0
                "new OlapMeta failed");
202
0
    }
203
107
    Status res = _meta->init();
204
107
    if (!res.ok()) {
205
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("open rocksdb failed, path={}", _path),
206
0
                                       "init OlapMeta failed");
207
0
    }
208
107
    return Status::OK();
209
107
}
210
211
0
Status DataDir::set_cluster_id(int32_t cluster_id) {
212
0
    if (_cluster_id != -1 && _cluster_id != cluster_id) {
213
0
        LOG(ERROR) << "going to set cluster id to already assigned store, cluster_id="
214
0
                   << _cluster_id << ", new_cluster_id=" << cluster_id;
215
0
        return Status::InternalError("going to set cluster id to already assigned store");
216
0
    }
217
0
    if (!_cluster_id_incomplete) {
218
0
        return Status::OK();
219
0
    }
220
0
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
221
0
    return _write_cluster_id_to_path(cluster_id_path, cluster_id);
222
0
}
223
224
0
Status DataDir::_write_cluster_id_to_path(const std::string& path, int32_t cluster_id) {
225
0
    std::stringstream cluster_id_ss;
226
0
    cluster_id_ss << cluster_id;
227
0
    bool exists = false;
228
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(path, &exists));
229
0
    if (!exists) {
230
0
        io::FileWriterPtr file_writer;
231
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &file_writer));
232
0
        RETURN_IF_ERROR(file_writer->append(cluster_id_ss.str()));
233
0
        RETURN_IF_ERROR(file_writer->close());
234
0
    }
235
0
    return Status::OK();
236
0
}
237
238
26
void DataDir::health_check() {
239
    // check disk
240
26
    if (_is_used) {
241
26
        Status res = _read_and_write_test_file();
242
26
        if (!res && res.is<IO_ERROR>()) {
243
0
            LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path
244
0
                         << ", err: " << res;
245
0
            StorageEngine::instance()->add_broken_path(_path);
246
0
            _is_used = !res.is<IO_ERROR>();
247
0
        }
248
26
    }
249
26
    disks_state->set_value(_is_used ? 1 : 0);
250
26
}
251
252
26
Status DataDir::_read_and_write_test_file() {
253
26
    auto test_file = fmt::format("{}/{}", _path, kTestFilePath);
254
26
    return read_write_test_file(test_file);
255
26
}
256
257
71
uint64_t DataDir::get_shard() {
258
71
    uint32_t next_shard = 0;
259
71
    {
260
71
        std::lock_guard<std::mutex> l(_mutex);
261
71
        next_shard = _current_shard;
262
71
        _current_shard = (_current_shard + 1) % MAX_SHARD_NUM;
263
71
    }
264
71
    return next_shard;
265
71
}
266
267
71
void DataDir::register_tablet(Tablet* tablet) {
268
71
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
269
270
71
    std::lock_guard<std::mutex> l(_mutex);
271
71
    _tablet_set.emplace(std::move(tablet_info));
272
71
}
273
274
25
void DataDir::deregister_tablet(Tablet* tablet) {
275
25
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
276
277
25
    std::lock_guard<std::mutex> l(_mutex);
278
25
    _tablet_set.erase(tablet_info);
279
25
}
280
281
0
void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) {
282
0
    std::lock_guard<std::mutex> l(_mutex);
283
284
0
    tablet_infos->insert(tablet_infos->end(), _tablet_set.begin(), _tablet_set.end());
285
0
    _tablet_set.clear();
286
0
}
287
288
0
std::string DataDir::get_absolute_shard_path(int64_t shard_id) {
289
0
    return fmt::format("{}/{}/{}", _path, DATA_PREFIX, shard_id);
290
0
}
291
292
std::string DataDir::get_absolute_tablet_path(int64_t shard_id, int64_t tablet_id,
293
0
                                              int32_t schema_hash) {
294
0
    return fmt::format("{}/{}/{}", get_absolute_shard_path(shard_id), tablet_id, schema_hash);
295
0
}
296
297
0
void DataDir::find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* paths) {
298
    // path: /root_path/trash/time_label/tablet_id/schema_hash
299
0
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
300
0
    bool exists = true;
301
0
    std::vector<io::FileInfo> sub_dirs;
302
0
    Status st = io::global_local_filesystem()->list(trash_path, false, &sub_dirs, &exists);
303
0
    if (!st) {
304
0
        return;
305
0
    }
306
307
0
    for (auto& sub_dir : sub_dirs) {
308
        // sub dir is time_label
309
0
        if (sub_dir.is_file) {
310
0
            continue;
311
0
        }
312
0
        auto sub_path = fmt::format("{}/{}", trash_path, sub_dir.file_name);
313
0
        auto tablet_path = fmt::format("{}/{}", sub_path, tablet_id);
314
0
        st = io::global_local_filesystem()->exists(tablet_path, &exists);
315
0
        if (st && exists) {
316
0
            paths->emplace_back(std::move(tablet_path));
317
0
        }
318
0
    }
319
0
}
320
321
std::string DataDir::get_root_path_from_schema_hash_path_in_trash(
322
0
        const std::string& schema_hash_dir_in_trash) {
323
0
    return io::Path(schema_hash_dir_in_trash)
324
0
            .parent_path()
325
0
            .parent_path()
326
0
            .parent_path()
327
0
            .parent_path()
328
0
            .string();
329
0
}
330
331
34
Status DataDir::_check_incompatible_old_format_tablet() {
332
34
    auto check_incompatible_old_func = [](int64_t tablet_id, int32_t schema_hash,
333
34
                                          const std::string& value) -> bool {
334
        // if strict check incompatible old format, then log fatal
335
0
        if (config::storage_strict_check_incompatible_old_format) {
336
0
            LOG(FATAL)
337
0
                    << "There are incompatible old format metas, current version does not support "
338
0
                    << "and it may lead to data missing!!! "
339
0
                    << "tablet_id = " << tablet_id << " schema_hash = " << schema_hash;
340
0
        } else {
341
0
            LOG(WARNING)
342
0
                    << "There are incompatible old format metas, current version does not support "
343
0
                    << "and it may lead to data missing!!! "
344
0
                    << "tablet_id = " << tablet_id << " schema_hash = " << schema_hash;
345
0
        }
346
0
        return false;
347
0
    };
348
349
    // seek old header prefix. when check_incompatible_old_func is called, it has old format in olap_meta
350
34
    Status check_incompatible_old_status = TabletMetaManager::traverse_headers(
351
34
            _meta, check_incompatible_old_func, OLD_HEADER_PREFIX);
352
34
    if (!check_incompatible_old_status) {
353
0
        LOG(WARNING) << "check incompatible old format meta fails, it may lead to data missing!!! "
354
0
                     << _path;
355
34
    } else {
356
34
        LOG(INFO) << "successfully check incompatible old format meta " << _path;
357
34
    }
358
34
    return check_incompatible_old_status;
359
34
}
360
361
// TODO(ygl): deal with rowsets and tablets when load failed
362
34
Status DataDir::load() {
363
34
    LOG(INFO) << "start to load tablets from " << _path;
364
365
    // load rowset meta from meta env and create rowset
366
    // COMMITTED: add to txn manager
367
    // VISIBLE: add to tablet
368
    // if one rowset load failed, then the total data dir will not be loaded
369
370
    // necessarily check incompatible old format. when there are old metas, it may load to data missing
371
34
    RETURN_IF_ERROR(_check_incompatible_old_format_tablet());
372
373
34
    std::vector<RowsetMetaSharedPtr> dir_rowset_metas;
374
34
    LOG(INFO) << "begin loading rowset from meta";
375
34
    auto load_rowset_func = [&dir_rowset_metas, &local_fs = fs(), this](
376
34
                                    TabletUid tablet_uid, RowsetId rowset_id,
377
34
                                    const std::string& meta_str) -> bool {
378
0
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
379
0
        bool parsed = rowset_meta->init(meta_str);
380
0
        if (!parsed) {
381
0
            LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
382
            // return false will break meta iterator, return true to skip this error
383
0
            return true;
384
0
        }
385
0
        if (rowset_meta->is_local()) {
386
0
            rowset_meta->set_fs(local_fs);
387
0
        }
388
0
        if (rowset_meta->has_delete_predicate()) {
389
            // copy the delete sub pred v1 to check then
390
0
            auto orig_delete_sub_pred = rowset_meta->delete_predicate().sub_predicates();
391
0
            auto* delete_pred = rowset_meta->mutable_delete_pred_pb();
392
393
0
            if ((!delete_pred->sub_predicates().empty() &&
394
0
                 delete_pred->sub_predicates_v2().empty()) ||
395
0
                (!delete_pred->in_predicates().empty() &&
396
0
                 delete_pred->in_predicates()[0].has_column_unique_id())) {
397
                // convert pred and write only when delete sub pred v2 is not set or there is in list pred to be set column uid
398
0
                DeleteHandler::convert_to_sub_pred_v2(delete_pred, rowset_meta->tablet_schema());
399
0
                LOG(INFO) << fmt::format(
400
0
                        "convert rowset with old delete pred: rowset_id={}, tablet_id={}",
401
0
                        rowset_id.to_string(), tablet_uid.to_string());
402
0
                CHECK_EQ(orig_delete_sub_pred.size(), delete_pred->sub_predicates().size())
403
0
                        << "inconsistent sub predicate v1 after conversion";
404
0
                for (size_t i = 0; i < orig_delete_sub_pred.size(); ++i) {
405
0
                    CHECK_STREQ(orig_delete_sub_pred.Get(i).c_str(),
406
0
                                delete_pred->sub_predicates().Get(i).c_str())
407
0
                            << "inconsistent sub predicate v1 after conversion";
408
0
                }
409
0
                std::string result;
410
0
                rowset_meta->serialize(&result);
411
0
                std::string key =
412
0
                        ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
413
0
                RETURN_IF_ERROR(_meta->put(META_COLUMN_FAMILY_INDEX, key, result));
414
0
            }
415
0
        }
416
417
0
        if (rowset_meta->partition_id() == 0) {
418
0
            LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
419
0
                         << " load from meta but partition id eq 0";
420
0
        }
421
422
0
        dir_rowset_metas.push_back(rowset_meta);
423
0
        return true;
424
0
    };
425
34
    Status load_rowset_status = RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
426
427
34
    if (!load_rowset_status) {
428
0
        LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
429
34
    } else {
430
34
        LOG(INFO) << "load rowset from meta finished, data dir: " << _path;
431
34
    }
432
433
    // load tablet
434
    // create tablet from tablet meta and add it to tablet mgr
435
34
    LOG(INFO) << "begin loading tablet from meta";
436
34
    std::set<int64_t> tablet_ids;
437
34
    std::set<int64_t> failed_tablet_ids;
438
34
    auto load_tablet_func = [this, &tablet_ids, &failed_tablet_ids](
439
34
                                    int64_t tablet_id, int32_t schema_hash,
440
34
                                    const std::string& value) -> bool {
441
0
        Status status = _tablet_manager->load_tablet_from_meta(this, tablet_id, schema_hash, value,
442
0
                                                               false, false, false, false);
443
0
        if (!status.ok() && !status.is<TABLE_ALREADY_DELETED_ERROR>() &&
444
0
            !status.is<ENGINE_INSERT_OLD_TABLET>()) {
445
            // load_tablet_from_meta() may return Status::Error<TABLE_ALREADY_DELETED_ERROR>()
446
            // which means the tablet status is DELETED
447
            // This may happen when the tablet was just deleted before the BE restarted,
448
            // but it has not been cleared from rocksdb. At this time, restarting the BE
449
            // will read the tablet in the DELETE state from rocksdb. These tablets have been
450
            // added to the garbage collection queue and will be automatically deleted afterwards.
451
            // Therefore, we believe that this situation is not a failure.
452
453
            // Besides, load_tablet_from_meta() may return Status::Error<ENGINE_INSERT_OLD_TABLET>()
454
            // when BE is restarting and the older tablet have been added to the
455
            // garbage collection queue but not deleted yet.
456
            // In this case, since the data_dirs are parallel loaded, a later loaded tablet
457
            // may be older than previously loaded one, which should not be acknowledged as a
458
            // failure.
459
0
            LOG(WARNING) << "load tablet from header failed. status:" << status
460
0
                         << ", tablet=" << tablet_id << "." << schema_hash;
461
0
            failed_tablet_ids.insert(tablet_id);
462
0
        } else {
463
0
            tablet_ids.insert(tablet_id);
464
0
        }
465
0
        return true;
466
0
    };
467
34
    Status load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
468
34
    if (failed_tablet_ids.size() != 0) {
469
0
        LOG(WARNING) << "load tablets from header failed"
470
0
                     << ", loaded tablet: " << tablet_ids.size()
471
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
472
0
        if (!config::ignore_load_tablet_failure) {
473
0
            LOG(FATAL) << "load tablets encounter failure. stop BE process. path: " << _path;
474
0
        }
475
0
    }
476
34
    if (!load_tablet_status) {
477
0
        LOG(WARNING) << "there is failure when loading tablet headers"
478
0
                     << ", loaded tablet: " << tablet_ids.size()
479
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
480
34
    } else {
481
34
        LOG(INFO) << "load tablet from meta finished"
482
34
                  << ", loaded tablet: " << tablet_ids.size()
483
34
                  << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
484
34
    }
485
486
34
    for (int64_t tablet_id : tablet_ids) {
487
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
488
0
        if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
489
0
            RETURN_IF_ERROR(TabletMetaManager::save(this, tablet->tablet_id(),
490
0
                                                    tablet->schema_hash(), tablet->tablet_meta()));
491
0
        }
492
0
    }
493
494
34
    auto load_pending_publish_info_func = [](int64_t tablet_id, int64_t publish_version,
495
34
                                             const string& info) {
496
0
        PendingPublishInfoPB pending_publish_info_pb;
497
0
        bool parsed = pending_publish_info_pb.ParseFromString(info);
498
0
        if (!parsed) {
499
0
            LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
500
0
                         << " publish_version: " << publish_version;
501
0
        }
502
0
        StorageEngine::instance()->add_async_publish_task(
503
0
                pending_publish_info_pb.partition_id(), tablet_id, publish_version,
504
0
                pending_publish_info_pb.transaction_id(), true);
505
0
        return true;
506
0
    };
507
34
    RETURN_IF_ERROR(
508
34
            TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func));
509
510
34
    int64_t rowset_partition_id_eq_0_num = 0;
511
34
    for (auto rowset_meta : dir_rowset_metas) {
512
0
        if (rowset_meta->partition_id() == 0) {
513
0
            ++rowset_partition_id_eq_0_num;
514
0
        }
515
0
    }
516
34
    if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
517
0
        LOG(FATAL) << fmt::format(
518
0
                "roswet partition id eq 0 is {} bigger than config {}, be exit, plz check be.INFO",
519
0
                rowset_partition_id_eq_0_num, config::ignore_invalid_partition_id_rowset_num);
520
0
        exit(-1);
521
0
    }
522
523
    // traverse rowset
524
    // 1. add committed rowset to txn map
525
    // 2. add visible rowset to tablet
526
    // ignore any errors when load tablet or rowset, because fe will repair them after report
527
34
    int64_t invalid_rowset_counter = 0;
528
34
    for (auto&& rowset_meta : dir_rowset_metas) {
529
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(rowset_meta->tablet_id());
530
        // tablet maybe dropped, but not drop related rowset meta
531
0
        if (tablet == nullptr) {
532
0
            VLOG_NOTICE << "could not find tablet id: " << rowset_meta->tablet_id()
533
0
                        << ", schema hash: " << rowset_meta->tablet_schema_hash()
534
0
                        << ", for rowset: " << rowset_meta->rowset_id() << ", skip this rowset";
535
0
            ++invalid_rowset_counter;
536
0
            continue;
537
0
        }
538
539
0
        if (rowset_meta->partition_id() == 0) {
540
0
            LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
541
0
                         << " rowset: " << rowset_meta->rowset_id()
542
0
                         << " txn: " << rowset_meta->txn_id();
543
0
            continue;
544
0
        }
545
546
0
        RowsetSharedPtr rowset;
547
0
        Status create_status = tablet->create_rowset(rowset_meta, &rowset);
548
0
        if (!create_status) {
549
0
            LOG(WARNING) << "could not create rowset from rowsetmeta: "
550
0
                         << " rowset_id: " << rowset_meta->rowset_id()
551
0
                         << " rowset_type: " << rowset_meta->rowset_type()
552
0
                         << " rowset_state: " << rowset_meta->rowset_state();
553
0
            continue;
554
0
        }
555
0
        if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
556
0
            rowset_meta->tablet_uid() == tablet->tablet_uid()) {
557
0
            if (!rowset_meta->tablet_schema()) {
558
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
559
0
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
560
0
                                                        rowset_meta->rowset_id(),
561
0
                                                        rowset_meta->get_rowset_pb(), false));
562
0
            }
563
0
            Status commit_txn_status = _txn_manager->commit_txn(
564
0
                    _meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
565
0
                    rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
566
0
                    rowset,
567
0
                    StorageEngine::instance()->pending_local_rowsets().add(
568
0
                            rowset_meta->rowset_id()),
569
0
                    true);
570
0
            if (commit_txn_status || commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
571
0
                LOG(INFO) << "successfully to add committed rowset: " << rowset_meta->rowset_id()
572
0
                          << " to tablet: " << rowset_meta->tablet_id()
573
0
                          << " schema hash: " << rowset_meta->tablet_schema_hash()
574
0
                          << " for txn: " << rowset_meta->txn_id();
575
576
0
            } else if (commit_txn_status.is<ErrorCode::INTERNAL_ERROR>()) {
577
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
578
0
                             << " to tablet: " << rowset_meta->tablet_id()
579
0
                             << " for txn: " << rowset_meta->txn_id()
580
0
                             << " error: " << commit_txn_status;
581
0
                return commit_txn_status;
582
0
            } else {
583
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
584
0
                             << " to tablet: " << rowset_meta->tablet_id()
585
0
                             << " for txn: " << rowset_meta->txn_id()
586
0
                             << " error: " << commit_txn_status;
587
0
            }
588
0
        } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
589
0
                   rowset_meta->tablet_uid() == tablet->tablet_uid()) {
590
0
            if (!rowset_meta->tablet_schema()) {
591
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
592
0
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
593
0
                                                        rowset_meta->rowset_id(),
594
0
                                                        rowset_meta->get_rowset_pb(), false));
595
0
            }
596
0
            Status publish_status = tablet->add_rowset(rowset);
597
0
            if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
598
0
                LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
599
0
                             << rowset->rowset_id() << " tablet id: " << rowset_meta->tablet_id()
600
0
                             << " txn id:" << rowset_meta->txn_id()
601
0
                             << " start_version: " << rowset_meta->version().first
602
0
                             << " end_version: " << rowset_meta->version().second;
603
0
            }
604
0
        } else {
605
0
            LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()
606
0
                         << " with tablet id: " << rowset_meta->tablet_id()
607
0
                         << " tablet uid: " << rowset_meta->tablet_uid()
608
0
                         << " schema hash: " << rowset_meta->tablet_schema_hash()
609
0
                         << " txn: " << rowset_meta->txn_id()
610
0
                         << " current valid tablet uid: " << tablet->tablet_uid();
611
0
            ++invalid_rowset_counter;
612
0
        }
613
0
    }
614
615
34
    auto load_delete_bitmap_func = [this](int64_t tablet_id, int64_t version, const string& val) {
616
0
        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
617
0
        if (!tablet) {
618
0
            return true;
619
0
        }
620
0
        const std::vector<RowsetMetaSharedPtr>& all_rowsets = tablet->tablet_meta()->all_rs_metas();
621
0
        RowsetIdUnorderedSet rowset_ids;
622
0
        for (auto& rowset_meta : all_rowsets) {
623
0
            rowset_ids.insert(rowset_meta->rowset_id());
624
0
        }
625
626
0
        DeleteBitmapPB delete_bitmap_pb;
627
0
        delete_bitmap_pb.ParseFromString(val);
628
0
        int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
629
0
        int seg_ids_size = delete_bitmap_pb.segment_ids_size();
630
0
        int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
631
0
        CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);
632
633
0
        for (size_t i = 0; i < rst_ids_size; ++i) {
634
0
            RowsetId rst_id;
635
0
            rst_id.init(delete_bitmap_pb.rowset_ids(i));
636
            // only process the rowset in _rs_metas
637
0
            if (rowset_ids.find(rst_id) == rowset_ids.end()) {
638
0
                continue;
639
0
            }
640
0
            auto seg_id = delete_bitmap_pb.segment_ids(i);
641
0
            auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
642
0
                    {rst_id, seg_id, version});
643
            // This version of delete bitmap already exists
644
0
            if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
645
0
                continue;
646
0
            }
647
0
            auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
648
0
            tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
649
0
                    roaring::Roaring::read(bitmap);
650
0
        }
651
0
        return true;
652
0
    };
653
34
    RETURN_IF_ERROR(TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func));
654
655
    // At startup, we only count these invalid rowset, but do not actually delete it.
656
    // The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
657
    // which is cleaned up uniformly by the background cleanup thread.
658
34
    LOG(INFO) << "finish to load tablets from " << _path
659
34
              << ", total rowset meta: " << dir_rowset_metas.size()
660
34
              << ", invalid rowset num: " << invalid_rowset_counter;
661
662
34
    return Status::OK();
663
34
}
664
665
// gc unused local tablet dir
666
43
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) {
667
43
    if (_stop_bg_worker) {
668
0
        return;
669
0
    }
670
671
43
    TTabletId tablet_id = -1;
672
43
    TSchemaHash schema_hash = -1;
673
43
    bool is_valid = TabletManager::get_tablet_id_and_schema_hash_from_path(
674
43
            tablet_schema_hash_path, &tablet_id, &schema_hash);
675
43
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
676
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
677
0
        return;
678
0
    }
679
680
43
    auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
681
43
    if (!tablet || tablet->data_dir() != this) {
682
10
        if (tablet) {
683
0
            LOG(INFO) << "The tablet in path " << tablet_schema_hash_path
684
0
                      << " is not same with the running one: " << tablet->tablet_path()
685
0
                      << ", might be the old tablet after migration, try to move it to trash";
686
0
        }
687
10
        StorageEngine::instance()->tablet_manager()->try_delete_unused_tablet_path(
688
10
                this, tablet_id, schema_hash, tablet_schema_hash_path, shard_id);
689
10
        return;
690
10
    }
691
692
33
    _perform_rowset_gc(tablet_schema_hash_path);
693
33
}
694
695
// gc unused local rowsets under tablet dir
696
33
void DataDir::_perform_rowset_gc(const std::string& tablet_schema_hash_path) {
697
33
    if (_stop_bg_worker) {
698
0
        return;
699
0
    }
700
701
33
    TTabletId tablet_id = -1;
702
33
    TSchemaHash schema_hash = -1;
703
33
    bool is_valid = doris::TabletManager::get_tablet_id_and_schema_hash_from_path(
704
33
            tablet_schema_hash_path, &tablet_id, &schema_hash);
705
33
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
706
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
707
0
        return;
708
0
    }
709
710
33
    auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
711
33
    if (!tablet) {
712
        // Could not found the tablet, maybe it's a dropped tablet, will be reclaimed
713
        // in the next time `_perform_path_gc_by_tablet`
714
0
        return;
715
0
    }
716
717
33
    if (tablet->data_dir() != this) {
718
        // Current running tablet is not in same data_dir, maybe it's a tablet after migration,
719
        // will be reclaimed in the next time `_perform_path_gc_by_tablet`
720
0
        return;
721
0
    }
722
723
33
    bool exists;
724
33
    std::vector<io::FileInfo> files;
725
33
    auto st = io::global_local_filesystem()->list(tablet_schema_hash_path, true, &files, &exists);
726
33
    if (!st.ok()) [[unlikely]] {
727
0
        LOG(WARNING) << "[path gc] fail to list tablet path " << tablet_schema_hash_path << " : "
728
0
                     << st;
729
0
        return;
730
0
    }
731
732
    // Rowset files excluding pending rowsets
733
33
    std::vector<std::pair<RowsetId, std::string /* filename */>> rowsets_not_pending;
734
481
    for (auto&& file : files) {
735
481
        auto rowset_id = extract_rowset_id(file.file_name);
736
481
        if (rowset_id.hi == 0) {
737
0
            continue; // Not a rowset
738
0
        }
739
740
481
        if (StorageEngine::instance()->pending_local_rowsets().contains(rowset_id)) {
741
81
            continue; // Pending rowset file
742
81
        }
743
744
400
        rowsets_not_pending.emplace_back(rowset_id, std::move(file.file_name));
745
400
    }
746
747
33
    RowsetIdUnorderedSet rowsets_in_version_map;
748
33
    tablet->traverse_rowsets(
749
54
            [&rowsets_in_version_map](auto& rs) { rowsets_in_version_map.insert(rs->rowset_id()); },
750
33
            true);
751
752
80
    auto reclaim_rowset_file = [](const std::string& path) {
753
80
        auto st = io::global_local_filesystem()->delete_file(path);
754
80
        if (!st.ok()) [[unlikely]] {
755
0
            LOG(WARNING) << "[path gc] failed to delete garbage rowset file: " << st;
756
0
            return;
757
0
        }
758
80
        LOG(INFO) << "[path gc] delete garbage path: " << path; // Audit log
759
80
    };
760
761
100
    auto should_reclaim = [&, this](const RowsetId& rowset_id) {
762
100
        return !rowsets_in_version_map.contains(rowset_id) &&
763
100
               !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id) &&
764
100
               RowsetMetaManager::exists(get_meta(), tablet->tablet_uid(), rowset_id)
765
40
                       .is<META_KEY_NOT_FOUND>();
766
100
    };
767
768
    // rowset_id -> is_garbage
769
33
    std::unordered_map<RowsetId, bool> checked_rowsets;
770
400
    for (auto&& [rowset_id, filename] : rowsets_not_pending) {
771
400
        if (_stop_bg_worker) {
772
0
            return;
773
0
        }
774
775
400
        if (auto it = checked_rowsets.find(rowset_id); it != checked_rowsets.end()) {
776
300
            if (it->second) { // Is checked garbage rowset
777
60
                reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
778
60
            }
779
300
            continue;
780
300
        }
781
782
100
        if (should_reclaim(rowset_id)) {
783
20
            if (config::path_gc_check_step > 0 &&
784
20
                ++_path_gc_step % config::path_gc_check_step == 0) {
785
0
                std::this_thread::sleep_for(
786
0
                        std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
787
0
            }
788
20
            reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
789
20
            checked_rowsets.emplace(rowset_id, true);
790
80
        } else {
791
80
            checked_rowsets.emplace(rowset_id, false);
792
80
        }
793
100
    }
794
33
}
795
796
19
void DataDir::perform_path_gc() {
797
19
    if (_stop_bg_worker) {
798
0
        return;
799
0
    }
800
801
19
    LOG(INFO) << "start to gc data dir " << _path;
802
19
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
803
19
    std::vector<io::FileInfo> shards;
804
19
    bool exists = true;
805
19
    const auto& fs = io::global_local_filesystem();
806
19
    auto st = fs->list(data_path, false, &shards, &exists);
807
19
    if (!st.ok()) [[unlikely]] {
808
0
        LOG(WARNING) << "failed to scan data dir: " << st;
809
0
        return;
810
0
    }
811
812
13.2k
    for (const auto& shard : shards) {
813
13.2k
        if (_stop_bg_worker) {
814
11
            break;
815
11
        }
816
817
13.2k
        if (shard.is_file) {
818
0
            continue;
819
0
        }
820
821
13.2k
        auto shard_path = fmt::format("{}/{}", data_path, shard.file_name);
822
13.2k
        std::vector<io::FileInfo> tablet_ids;
823
13.2k
        st = io::global_local_filesystem()->list(shard_path, false, &tablet_ids, &exists);
824
13.2k
        if (!st.ok()) [[unlikely]] {
825
0
            LOG(WARNING) << "fail to walk dir, shard_path=" << shard_path << " : " << st;
826
0
            continue;
827
0
        }
828
829
13.2k
        for (const auto& tablet_id : tablet_ids) {
830
43
            if (_stop_bg_worker) {
831
0
                break;
832
0
            }
833
834
43
            if (tablet_id.is_file) {
835
0
                continue;
836
0
            }
837
838
43
            auto tablet_id_path = fmt::format("{}/{}", shard_path, tablet_id.file_name);
839
43
            std::vector<io::FileInfo> schema_hashes;
840
43
            st = fs->list(tablet_id_path, false, &schema_hashes, &exists);
841
43
            if (!st.ok()) [[unlikely]] {
842
0
                LOG(WARNING) << "fail to walk dir, tablet_id_path=" << tablet_id_path << " : "
843
0
                             << st;
844
0
                continue;
845
0
            }
846
847
43
            for (auto&& schema_hash : schema_hashes) {
848
43
                if (schema_hash.is_file) {
849
0
                    continue;
850
0
                }
851
852
43
                if (config::path_gc_check_step > 0 &&
853
43
                    ++_path_gc_step % config::path_gc_check_step == 0) {
854
0
                    std::this_thread::sleep_for(
855
0
                            std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
856
0
                }
857
43
                int16_t shard_id = -1;
858
43
                try {
859
43
                    shard_id = std::stoi(shard.file_name);
860
43
                } catch (const std::exception&) {
861
0
                    LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name;
862
0
                    continue;
863
0
                }
864
43
                _perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id);
865
43
            }
866
43
        }
867
13.2k
    }
868
869
19
    LOG(INFO) << "gc data dir path: " << _path << " finished";
870
19
}
871
872
124
Status DataDir::update_capacity() {
873
124
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
874
124
                                                                  &_available_bytes));
875
114
    disks_total_capacity->set_value(_disk_capacity_bytes);
876
114
    disks_avail_capacity->set_value(_available_bytes);
877
114
    LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
878
114
              << ", available capacity: " << _available_bytes << ", usage: " << get_usage(0)
879
114
              << ", in_use: " << is_used();
880
881
114
    return Status::OK();
882
124
}
883
884
18
void DataDir::update_trash_capacity() {
885
18
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
886
18
    try {
887
18
        _trash_used_bytes = StorageEngine::instance()->get_file_or_directory_size(trash_path);
888
18
    } catch (const std::filesystem::filesystem_error& e) {
889
0
        LOG(WARNING) << "update trash capacity failed, path: " << _path << ", err: " << e.what();
890
0
        return;
891
0
    }
892
18
    disks_trash_used_capacity->set_value(_trash_used_bytes);
893
18
    LOG(INFO) << "path: " << _path << " trash capacity: " << _trash_used_bytes;
894
18
}
895
896
18
void DataDir::update_local_data_size(int64_t size) {
897
18
    disks_local_used_capacity->set_value(size);
898
18
}
899
900
18
void DataDir::update_remote_data_size(int64_t size) {
901
18
    disks_remote_used_capacity->set_value(size);
902
18
}
903
904
0
size_t DataDir::tablet_size() const {
905
0
    std::lock_guard<std::mutex> l(_mutex);
906
0
    return _tablet_set.size();
907
0
}
908
909
243
bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
910
243
    double used_pct = get_usage(incoming_data_size);
911
243
    int64_t left_bytes = _available_bytes - incoming_data_size;
912
243
    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
913
243
        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
914
0
        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
915
0
                     << ", left bytes: " << left_bytes << ", path: " << _path;
916
0
        return true;
917
0
    }
918
243
    return false;
919
243
}
920
921
0
void DataDir::disks_compaction_score_increment(int64_t delta) {
922
0
    disks_compaction_score->increment(delta);
923
0
}
924
925
0
void DataDir::disks_compaction_num_increment(int64_t delta) {
926
0
    disks_compaction_num->increment(delta);
927
0
}
928
929
13
Status DataDir::move_to_trash(const std::string& tablet_path) {
930
13
    if (config::trash_file_expire_time_sec <= 0) {
931
0
        LOG(INFO) << "delete tablet dir " << tablet_path
932
0
                  << " directly due to trash_file_expire_time_sec is 0";
933
0
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
934
0
        return delete_tablet_parent_path_if_empty(tablet_path);
935
0
    }
936
937
13
    Status res = Status::OK();
938
    // 1. get timestamp string
939
13
    string time_str;
940
13
    if ((res = gen_timestamp_string(&time_str)) != Status::OK()) {
941
0
        LOG(WARNING) << "failed to generate time_string when move file to trash.err code=" << res;
942
0
        return res;
943
0
    }
944
945
    // 2. generate new file path
946
    // a global counter to avoid file name duplication.
947
13
    static std::atomic<uint64_t> delete_counter(0);
948
13
    auto trash_root_path =
949
13
            fmt::format("{}/{}/{}.{}", _path, TRASH_PREFIX, time_str, delete_counter++);
950
13
    auto fs_tablet_path = io::Path(tablet_path);
951
13
    auto trash_tablet_path = trash_root_path /
952
13
                             fs_tablet_path.parent_path().filename() /* tablet_id */ /
953
13
                             fs_tablet_path.filename() /* schema_hash */;
954
955
    // 3. create target dir, or the rename() function will fail.
956
13
    auto trash_tablet_parent = trash_tablet_path.parent_path();
957
    // create dir if not exists
958
13
    bool exists = true;
959
13
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(trash_tablet_parent, &exists));
960
13
    if (!exists) {
961
13
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(trash_tablet_parent));
962
13
    }
963
964
    // 4. move tablet to trash
965
13
    VLOG_NOTICE << "move file to trash. " << tablet_path << " -> " << trash_tablet_path;
966
13
    if (rename(tablet_path.c_str(), trash_tablet_path.c_str()) < 0) {
967
0
        return Status::Error<OS_ERROR>("move file to trash failed. file={}, target={}, err={}",
968
0
                                       tablet_path, trash_tablet_path.native(), Errno::str());
969
0
    }
970
971
    // 5. check parent dir of source file, delete it when empty
972
13
    RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path));
973
974
13
    return Status::OK();
975
13
}
976
977
13
Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) {
978
13
    auto fs_tablet_path = io::Path(tablet_path);
979
13
    std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level
980
13
    std::vector<io::FileInfo> sub_files;
981
13
    bool exists = true;
982
13
    RETURN_IF_ERROR(
983
13
            io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists));
984
13
    if (sub_files.empty()) {
985
13
        LOG(INFO) << "remove empty dir " << source_parent_dir;
986
        // no need to exam return status
987
13
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(source_parent_dir));
988
13
    }
989
13
    return Status::OK();
990
13
}
991
992
18
void DataDir::perform_remote_rowset_gc() {
993
18
    std::vector<std::pair<std::string, std::string>> gc_kvs;
994
18
    auto traverse_remote_rowset_func = [&gc_kvs](const std::string& key,
995
18
                                                 const std::string& value) -> bool {
996
0
        gc_kvs.emplace_back(key, value);
997
0
        return true;
998
0
    };
999
18
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX,
1000
18
                                     traverse_remote_rowset_func));
1001
18
    std::vector<std::string> deleted_keys;
1002
18
    for (auto& [key, val] : gc_kvs) {
1003
0
        auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
1004
0
        RemoteRowsetGcPB gc_pb;
1005
0
        if (!gc_pb.ParseFromString(val)) {
1006
0
            LOG(WARNING) << "malformed RemoteRowsetGcPB. rowset_id=" << rowset_id;
1007
0
            deleted_keys.push_back(std::move(key));
1008
0
            continue;
1009
0
        }
1010
0
        auto fs = get_filesystem(gc_pb.resource_id());
1011
0
        if (!fs) {
1012
0
            LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
1013
0
            continue;
1014
0
        }
1015
0
        DCHECK(fs->type() != io::FileSystemType::LOCAL);
1016
0
        std::vector<io::Path> seg_paths;
1017
0
        seg_paths.reserve(gc_pb.num_segments());
1018
0
        for (int i = 0; i < gc_pb.num_segments(); ++i) {
1019
0
            seg_paths.push_back(BetaRowset::remote_segment_path(gc_pb.tablet_id(), rowset_id, i));
1020
0
        }
1021
0
        LOG(INFO) << "delete remote rowset. root_path=" << fs->root_path()
1022
0
                  << ", rowset_id=" << rowset_id;
1023
0
        auto st = std::static_pointer_cast<io::RemoteFileSystem>(fs)->batch_delete(seg_paths);
1024
0
        if (st.ok()) {
1025
0
            deleted_keys.push_back(std::move(key));
1026
0
        } else {
1027
0
            LOG(WARNING) << "failed to delete remote rowset. err=" << st;
1028
0
        }
1029
0
    }
1030
18
    for (const auto& key : deleted_keys) {
1031
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1032
0
    }
1033
18
}
1034
1035
18
void DataDir::perform_remote_tablet_gc() {
1036
18
    std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
1037
18
    auto traverse_remote_tablet_func = [&tablet_gc_kvs](const std::string& key,
1038
18
                                                        const std::string& value) -> bool {
1039
0
        tablet_gc_kvs.emplace_back(key, value);
1040
0
        return true;
1041
0
    };
1042
18
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX,
1043
18
                                     traverse_remote_tablet_func));
1044
18
    std::vector<std::string> deleted_keys;
1045
18
    for (auto& [key, val] : tablet_gc_kvs) {
1046
0
        auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
1047
0
        RemoteTabletGcPB gc_pb;
1048
0
        if (!gc_pb.ParseFromString(val)) {
1049
0
            LOG(WARNING) << "malformed RemoteTabletGcPB. tablet_id=" << tablet_id;
1050
0
            deleted_keys.push_back(std::move(key));
1051
0
            continue;
1052
0
        }
1053
0
        bool success = true;
1054
0
        for (auto& resource_id : gc_pb.resource_ids()) {
1055
0
            auto fs = get_filesystem(resource_id);
1056
0
            if (!fs) {
1057
0
                LOG(WARNING) << "could not get file system. resource_id=" << resource_id;
1058
0
                success = false;
1059
0
                continue;
1060
0
            }
1061
0
            LOG(INFO) << "delete remote rowsets of tablet. root_path=" << fs->root_path()
1062
0
                      << ", tablet_id=" << tablet_id;
1063
0
            auto st = fs->delete_directory(DATA_PREFIX + '/' + tablet_id);
1064
0
            if (!st.ok()) {
1065
0
                LOG(WARNING) << "failed to delete all remote rowset in tablet. err=" << st;
1066
0
                success = false;
1067
0
            }
1068
0
        }
1069
0
        if (success) {
1070
0
            deleted_keys.push_back(std::move(key));
1071
0
        }
1072
0
    }
1073
18
    for (const auto& key : deleted_keys) {
1074
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1075
0
    }
1076
18
}
1077
1078
} // namespace doris