Coverage Report

Created: 2026-04-03 12:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/data_dir.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/data_dir.h"
19
20
#include <fmt/core.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <atomic>
27
#include <cstdio>
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <chrono> // IWYU pragma: keep
30
#include <cstddef>
31
#include <filesystem>
32
#include <memory>
33
#include <new>
34
#include <roaring/roaring.hh>
35
#include <set>
36
#include <sstream>
37
#include <string>
38
#include <thread>
39
#include <utility>
40
41
#include "common/cast_set.h"
42
#include "common/config.h"
43
#include "common/logging.h"
44
#include "common/metrics/doris_metrics.h"
45
#include "io/fs/file_reader.h"
46
#include "io/fs/file_writer.h"
47
#include "io/fs/local_file_system.h"
48
#include "io/fs/path.h"
49
#include "service/backend_options.h"
50
#include "storage/delete/delete_handler.h"
51
#include "storage/olap_common.h"
52
#include "storage/olap_define.h"
53
#include "storage/olap_meta.h"
54
#include "storage/rowset/beta_rowset.h"
55
#include "storage/rowset/pending_rowset_helper.h"
56
#include "storage/rowset/rowset.h"
57
#include "storage/rowset/rowset_id_generator.h"
58
#include "storage/rowset/rowset_meta.h"
59
#include "storage/rowset/rowset_meta_manager.h"
60
#include "storage/storage_engine.h"
61
#include "storage/storage_policy.h"
62
#include "storage/tablet/tablet.h"
63
#include "storage/tablet/tablet_manager.h"
64
#include "storage/tablet/tablet_meta_manager.h"
65
#include "storage/txn/txn_manager.h"
66
#include "storage/utils.h" // for check_dir_existed
67
#include "util/string_util.h"
68
#include "util/uid_util.h"
69
70
namespace doris {
71
#include "common/compile_check_begin.h"
72
using namespace ErrorCode;
73
74
namespace {
75
76
101
Status read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id) {
77
101
    bool exists = false;
78
101
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
79
101
    *cluster_id = -1;
80
101
    if (exists) {
81
4
        io::FileReaderSPtr reader;
82
4
        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
83
4
        size_t fsize = reader->size();
84
4
        if (fsize > 0) {
85
4
            std::string content;
86
4
            content.resize(fsize, '\0');
87
4
            size_t bytes_read = 0;
88
4
            RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
89
4
            DCHECK_EQ(fsize, bytes_read);
90
4
            *cluster_id = std::stoi(content);
91
4
        }
92
4
    }
93
101
    return Status::OK();
94
101
}
95
96
2
Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id) {
97
2
    bool exists = false;
98
2
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(path, &exists));
99
2
    if (!exists) {
100
2
        io::FileWriterPtr file_writer;
101
2
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &file_writer));
102
2
        RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
103
2
        RETURN_IF_ERROR(file_writer->close());
104
2
    }
105
2
    return Status::OK();
106
2
}
107
108
} // namespace
109
110
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_total_capacity, MetricUnit::BYTES);
111
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_avail_capacity, MetricUnit::BYTES);
112
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_local_used_capacity, MetricUnit::BYTES);
113
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_remote_used_capacity, MetricUnit::BYTES);
114
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_trash_used_capacity, MetricUnit::BYTES);
115
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_state, MetricUnit::BYTES);
116
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_score, MetricUnit::NOUNIT);
117
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_num, MetricUnit::NOUNIT);
118
119
DataDir::DataDir(StorageEngine& engine, const std::string& path, int64_t capacity_bytes,
120
                 TStorageMedium::type storage_medium)
121
254
        : _engine(engine),
122
254
          _path(path),
123
254
          _available_bytes(0),
124
254
          _disk_capacity_bytes(0),
125
254
          _trash_used_bytes(0),
126
254
          _storage_medium(storage_medium),
127
254
          _is_used(false),
128
254
          _cluster_id(-1),
129
254
          _to_be_deleted(false) {
130
254
    _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
131
254
            std::string("data_dir.") + path, {{"path", path}});
132
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity);
133
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
134
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_local_used_capacity);
135
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_remote_used_capacity);
136
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_trash_used_capacity);
137
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
138
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
139
254
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
140
254
}
141
142
251
DataDir::~DataDir() {
143
251
    DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity);
144
251
    delete _meta;
145
251
}
146
147
101
Status DataDir::init(bool init_meta) {
148
101
    bool exists = false;
149
101
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
150
101
    if (!exists) {
151
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
152
0
                                       "check file exist failed");
153
0
    }
154
155
101
    RETURN_NOT_OK_STATUS_WITH_WARN(update_capacity(), "update_capacity failed");
156
101
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
157
101
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity_and_create_shards(),
158
101
                                   "_init_capacity_and_create_shards failed");
159
101
    if (init_meta) {
160
101
        RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
161
101
    }
162
163
101
    _is_used = true;
164
101
    return Status::OK();
165
101
}
166
167
48
void DataDir::stop_bg_worker() {
168
48
    _stop_bg_worker = true;
169
48
}
170
171
101
Status DataDir::_init_cluster_id() {
172
101
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
173
101
    RETURN_IF_ERROR(read_cluster_id(cluster_id_path, &_cluster_id));
174
101
    if (_cluster_id == -1) {
175
97
        _cluster_id_incomplete = true;
176
97
    }
177
101
    return Status::OK();
178
101
}
179
180
101
Status DataDir::_init_capacity_and_create_shards() {
181
101
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
182
101
                                                                  &_available_bytes));
183
101
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
184
101
    bool exists = false;
185
101
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(data_path, &exists));
186
101
    if (!exists) {
187
97
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(data_path));
188
97
    }
189
103k
    for (int i = 0; i < MAX_SHARD_NUM; ++i) {
190
103k
        auto shard_path = fmt::format("{}/{}", data_path, i);
191
103k
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(shard_path, &exists));
192
103k
        if (!exists) {
193
99.3k
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path));
194
99.3k
        }
195
103k
    }
196
197
101
    return Status::OK();
198
101
}
199
200
102
Status DataDir::_init_meta() {
201
    // init path hash
202
102
    _path_hash = hash_of_path(BackendOptions::get_localhost(), _path);
203
102
    LOG(INFO) << "path: " << _path << ", hash: " << _path_hash;
204
205
    // init meta
206
102
    _meta = new (std::nothrow) OlapMeta(_path);
207
102
    if (_meta == nullptr) {
208
0
        RETURN_NOT_OK_STATUS_WITH_WARN(
209
0
                Status::MemoryAllocFailed("allocate memory for OlapMeta failed"),
210
0
                "new OlapMeta failed");
211
0
    }
212
102
    Status res = _meta->init();
213
102
    if (!res.ok()) {
214
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("open rocksdb failed, path={}", _path),
215
0
                                       "init OlapMeta failed");
216
0
    }
217
102
    return Status::OK();
218
102
}
219
220
2
Status DataDir::set_cluster_id(int32_t cluster_id) {
221
2
    if (_cluster_id != -1 && _cluster_id != cluster_id) {
222
0
        LOG(ERROR) << "going to set cluster id to already assigned store, cluster_id="
223
0
                   << _cluster_id << ", new_cluster_id=" << cluster_id;
224
0
        return Status::InternalError("going to set cluster id to already assigned store");
225
0
    }
226
2
    if (!_cluster_id_incomplete) {
227
0
        return Status::OK();
228
0
    }
229
2
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
230
2
    return _write_cluster_id_to_path(cluster_id_path, cluster_id);
231
2
}
232
233
1.76k
void DataDir::health_check() {
234
    // check disk
235
1.76k
    if (_is_used) {
236
1.76k
        Status res = _read_and_write_test_file();
237
1.76k
        if (!res && res.is<IO_ERROR>()) {
238
0
            LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path
239
0
                         << ", err: " << res;
240
0
            _engine.add_broken_path(_path);
241
0
            _is_used = !res.is<IO_ERROR>();
242
0
        }
243
1.76k
    }
244
1.76k
    disks_state->set_value(_is_used ? 1 : 0);
245
1.76k
}
246
247
1.76k
Status DataDir::_read_and_write_test_file() {
248
1.76k
    auto test_file = fmt::format("{}/{}", _path, kTestFilePath);
249
1.76k
    return read_write_test_file(test_file);
250
1.76k
}
251
252
286k
void DataDir::register_tablet(Tablet* tablet) {
253
286k
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
254
255
286k
    std::lock_guard<std::mutex> l(_mutex);
256
286k
    _tablet_set.emplace(std::move(tablet_info));
257
286k
}
258
259
14.4k
void DataDir::deregister_tablet(Tablet* tablet) {
260
14.4k
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
261
262
14.4k
    std::lock_guard<std::mutex> l(_mutex);
263
14.4k
    _tablet_set.erase(tablet_info);
264
14.4k
}
265
266
0
void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) {
267
0
    std::lock_guard<std::mutex> l(_mutex);
268
269
0
    tablet_infos->insert(tablet_infos->end(), _tablet_set.begin(), _tablet_set.end());
270
0
    _tablet_set.clear();
271
0
}
272
273
0
std::string DataDir::get_absolute_shard_path(int64_t shard_id) {
274
0
    return fmt::format("{}/{}/{}", _path, DATA_PREFIX, shard_id);
275
0
}
276
277
std::string DataDir::get_absolute_tablet_path(int64_t shard_id, int64_t tablet_id,
278
0
                                              int32_t schema_hash) {
279
0
    return fmt::format("{}/{}/{}", get_absolute_shard_path(shard_id), tablet_id, schema_hash);
280
0
}
281
282
0
void DataDir::find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* paths) {
283
    // path: /root_path/trash/time_label/tablet_id/schema_hash
284
0
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
285
0
    bool exists = true;
286
0
    std::vector<io::FileInfo> sub_dirs;
287
0
    Status st = io::global_local_filesystem()->list(trash_path, false, &sub_dirs, &exists);
288
0
    if (!st) {
289
0
        return;
290
0
    }
291
292
0
    for (auto& sub_dir : sub_dirs) {
293
        // sub dir is time_label
294
0
        if (sub_dir.is_file) {
295
0
            continue;
296
0
        }
297
0
        auto sub_path = fmt::format("{}/{}", trash_path, sub_dir.file_name);
298
0
        auto tablet_path = fmt::format("{}/{}", sub_path, tablet_id);
299
0
        st = io::global_local_filesystem()->exists(tablet_path, &exists);
300
0
        if (st && exists) {
301
0
            paths->emplace_back(std::move(tablet_path));
302
0
        }
303
0
    }
304
0
}
305
306
std::string DataDir::get_root_path_from_schema_hash_path_in_trash(
307
0
        const std::string& schema_hash_dir_in_trash) {
308
0
    return io::Path(schema_hash_dir_in_trash)
309
0
            .parent_path()
310
0
            .parent_path()
311
0
            .parent_path()
312
0
            .parent_path()
313
0
            .string();
314
0
}
315
316
51
Status DataDir::_check_incompatible_old_format_tablet() {
317
51
    auto check_incompatible_old_func = [](int64_t tablet_id, int32_t schema_hash,
318
51
                                          std::string_view value) -> bool {
319
        // if strict check incompatible old format, then log fatal
320
0
        if (config::storage_strict_check_incompatible_old_format) {
321
0
            throw Exception(Status::FatalError(
322
0
                    "There are incompatible old format metas, current version does not support and "
323
0
                    "it may lead to data missing!!! tablet_id = {} schema_hash = {}",
324
0
                    tablet_id, schema_hash));
325
0
        } else {
326
0
            LOG(WARNING)
327
0
                    << "There are incompatible old format metas, current version does not support "
328
0
                    << "and it may lead to data missing!!! "
329
0
                    << "tablet_id = " << tablet_id << " schema_hash = " << schema_hash;
330
0
        }
331
0
        return false;
332
0
    };
333
334
    // seek old header prefix. when check_incompatible_old_func is called, it has old format in olap_meta
335
51
    Status check_incompatible_old_status = TabletMetaManager::traverse_headers(
336
51
            _meta, check_incompatible_old_func, OLD_HEADER_PREFIX);
337
51
    if (!check_incompatible_old_status) {
338
0
        LOG(WARNING) << "check incompatible old format meta fails, it may lead to data missing!!! "
339
0
                     << _path;
340
51
    } else {
341
51
        LOG(INFO) << "successfully check incompatible old format meta " << _path;
342
51
    }
343
51
    return check_incompatible_old_status;
344
51
}
345
346
// TODO(ygl): deal with rowsets and tablets when load failed
347
51
Status DataDir::load() {
348
51
    LOG(INFO) << "start to load tablets from " << _path;
349
350
    // load rowset meta from meta env and create rowset
351
    // COMMITTED: add to txn manager
352
    // VISIBLE: add to tablet
353
    // if one rowset load failed, then the total data dir will not be loaded
354
355
    // necessarily check incompatible old format. when there are old metas, it may load to data missing
356
51
    RETURN_IF_ERROR(_check_incompatible_old_format_tablet());
357
358
51
    std::vector<RowsetMetaSharedPtr> dir_rowset_metas;
359
51
    LOG(INFO) << "begin loading rowset from meta";
360
51
    auto load_rowset_func = [&dir_rowset_metas, this](TabletUid tablet_uid, RowsetId rowset_id,
361
17.6k
                                                      std::string_view meta_str) -> bool {
362
17.6k
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
363
17.6k
        bool parsed = rowset_meta->init(meta_str);
364
17.6k
        if (!parsed) {
365
0
            LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
366
            // return false will break meta iterator, return true to skip this error
367
0
            return true;
368
0
        }
369
370
17.6k
        if (rowset_meta->has_delete_predicate()) {
371
            // copy the delete sub pred v1 to check then
372
3.49k
            auto orig_delete_sub_pred = rowset_meta->delete_predicate().sub_predicates();
373
3.49k
            auto* delete_pred = rowset_meta->mutable_delete_pred_pb();
374
375
3.49k
            if ((!delete_pred->sub_predicates().empty() &&
376
3.49k
                 delete_pred->sub_predicates_v2().empty()) ||
377
3.49k
                (!delete_pred->in_predicates().empty() &&
378
3.49k
                 delete_pred->in_predicates()[0].has_column_unique_id())) {
379
                // convert pred and write only when delete sub pred v2 is not set or there is in list pred to be set column uid
380
256
                RETURN_IF_ERROR(DeleteHandler::convert_to_sub_pred_v2(
381
256
                        delete_pred, rowset_meta->tablet_schema()));
382
256
                LOG(INFO) << fmt::format(
383
256
                        "convert rowset with old delete pred: rowset_id={}, tablet_id={}",
384
256
                        rowset_id.to_string(), tablet_uid.to_string());
385
256
                CHECK_EQ(orig_delete_sub_pred.size(), delete_pred->sub_predicates().size())
386
0
                        << "inconsistent sub predicate v1 after conversion";
387
256
                for (int i = 0; i < orig_delete_sub_pred.size(); ++i) {
388
0
                    CHECK_STREQ(orig_delete_sub_pred.Get(i).c_str(),
389
0
                                delete_pred->sub_predicates().Get(i).c_str())
390
0
                            << "inconsistent sub predicate v1 after conversion";
391
0
                }
392
256
                std::string result;
393
256
                rowset_meta->serialize(&result);
394
256
                std::string key =
395
256
                        ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
396
256
                RETURN_IF_ERROR(_meta->put(META_COLUMN_FAMILY_INDEX, key, result));
397
256
            }
398
3.49k
        }
399
400
17.6k
        if (rowset_meta->partition_id() == 0) {
401
0
            LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
402
0
                         << " load from meta but partition id eq 0";
403
0
        }
404
405
17.6k
        dir_rowset_metas.push_back(rowset_meta);
406
17.6k
        return true;
407
17.6k
    };
408
51
    MonotonicStopWatch rs_timer;
409
51
    rs_timer.start();
410
51
    Status load_rowset_status = RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
411
51
    rs_timer.stop();
412
51
    if (!load_rowset_status) {
413
0
        LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
414
51
    } else {
415
51
        LOG(INFO) << "load rowset from meta finished, cost: "
416
51
                  << rs_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
417
51
    }
418
419
    // load tablet
420
    // create tablet from tablet meta and add it to tablet mgr
421
51
    LOG(INFO) << "begin loading tablet from meta";
422
51
    std::set<int64_t> tablet_ids;
423
51
    std::set<int64_t> failed_tablet_ids;
424
51
    auto load_tablet_func = [this, &tablet_ids, &failed_tablet_ids](
425
51
                                    int64_t tablet_id, int32_t schema_hash,
426
310k
                                    std::string_view value) -> bool {
427
310k
        Status status = _engine.tablet_manager()->load_tablet_from_meta(
428
310k
                this, tablet_id, schema_hash, value, false, false, false, false);
429
310k
        if (!status.ok() && !status.is<TABLE_ALREADY_DELETED_ERROR>() &&
430
310k
            !status.is<ENGINE_INSERT_OLD_TABLET>()) {
431
            // load_tablet_from_meta() may return Status::Error<TABLE_ALREADY_DELETED_ERROR>()
432
            // which means the tablet status is DELETED
433
            // This may happen when the tablet was just deleted before the BE restarted,
434
            // but it has not been cleared from rocksdb. At this time, restarting the BE
435
            // will read the tablet in the DELETE state from rocksdb. These tablets have been
436
            // added to the garbage collection queue and will be automatically deleted afterwards.
437
            // Therefore, we believe that this situation is not a failure.
438
439
            // Besides, load_tablet_from_meta() may return Status::Error<ENGINE_INSERT_OLD_TABLET>()
440
            // when BE is restarting and the older tablet have been added to the
441
            // garbage collection queue but not deleted yet.
442
            // In this case, since the data_dirs are parallel loaded, a later loaded tablet
443
            // may be older than previously loaded one, which should not be acknowledged as a
444
            // failure.
445
0
            LOG(WARNING) << "load tablet from header failed. status:" << status
446
0
                         << ", tablet=" << tablet_id << "." << schema_hash;
447
0
            failed_tablet_ids.insert(tablet_id);
448
310k
        } else {
449
310k
            tablet_ids.insert(tablet_id);
450
310k
        }
451
310k
        return true;
452
310k
    };
453
51
    MonotonicStopWatch tablet_timer;
454
51
    tablet_timer.start();
455
51
    Status load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
456
51
    tablet_timer.stop();
457
51
    if (!failed_tablet_ids.empty()) {
458
0
        LOG(WARNING) << "load tablets from header failed"
459
0
                     << ", loaded tablet: " << tablet_ids.size()
460
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
461
0
        if (!config::ignore_load_tablet_failure) {
462
0
            throw Exception(Status::FatalError(
463
0
                    "load tablets encounter failure. stop BE process. path: {}", _path));
464
0
        }
465
0
    }
466
51
    if (!load_tablet_status) {
467
0
        LOG(WARNING) << "there is failure when loading tablet headers"
468
0
                     << ", loaded tablet: " << tablet_ids.size()
469
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
470
51
    } else {
471
51
        LOG(INFO) << "load tablet from meta finished"
472
51
                  << ", loaded tablet: " << tablet_ids.size()
473
51
                  << ", error tablet: " << failed_tablet_ids.size()
474
51
                  << ", cost: " << tablet_timer.elapsed_time_milliseconds()
475
51
                  << " ms, path: " << _path;
476
51
    }
477
478
310k
    for (int64_t tablet_id : tablet_ids) {
479
310k
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
480
310k
        if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
481
0
            RETURN_IF_ERROR(TabletMetaManager::save(this, tablet->tablet_id(),
482
0
                                                    tablet->schema_hash(), tablet->tablet_meta()));
483
0
        }
484
310k
    }
485
486
51
    auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id,
487
51
                                                              int64_t publish_version,
488
51
                                                              std::string_view info) {
489
0
        PendingPublishInfoPB pending_publish_info_pb;
490
0
        bool parsed =
491
0
                pending_publish_info_pb.ParseFromArray(info.data(), cast_set<int>(info.size()));
492
0
        if (!parsed) {
493
0
            LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
494
0
                         << " publish_version: " << publish_version;
495
0
        }
496
0
        engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
497
0
                                      publish_version, pending_publish_info_pb.transaction_id(),
498
0
                                      true, pending_publish_info_pb.commit_tso());
499
0
        return true;
500
0
    };
501
51
    MonotonicStopWatch pending_publish_timer;
502
51
    pending_publish_timer.start();
503
51
    RETURN_IF_ERROR(
504
51
            TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func));
505
51
    pending_publish_timer.stop();
506
51
    LOG(INFO) << "load pending publish task from meta finished, cost: "
507
51
              << pending_publish_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
508
509
51
    int64_t rowset_partition_id_eq_0_num = 0;
510
17.6k
    for (auto rowset_meta : dir_rowset_metas) {
511
17.6k
        if (rowset_meta->partition_id() == 0) {
512
0
            ++rowset_partition_id_eq_0_num;
513
0
        }
514
17.6k
    }
515
51
    if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
516
0
        throw Exception(Status::FatalError(
517
0
                "rowset partition id eq 0 is {} bigger than config {}, be exit, plz check be.INFO",
518
0
                rowset_partition_id_eq_0_num, config::ignore_invalid_partition_id_rowset_num));
519
0
    }
520
521
    // traverse rowset
522
    // 1. add committed rowset to txn map
523
    // 2. add visible rowset to tablet
524
    // ignore any errors when load tablet or rowset, because fe will repair them after report
525
51
    int64_t invalid_rowset_counter = 0;
526
17.6k
    for (auto&& rowset_meta : dir_rowset_metas) {
527
17.6k
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(rowset_meta->tablet_id());
528
        // tablet maybe dropped, but not drop related rowset meta
529
17.6k
        if (tablet == nullptr) {
530
0
            VLOG_NOTICE << "could not find tablet id: " << rowset_meta->tablet_id()
531
0
                        << ", schema hash: " << rowset_meta->tablet_schema_hash()
532
0
                        << ", for rowset: " << rowset_meta->rowset_id() << ", skip this rowset";
533
0
            ++invalid_rowset_counter;
534
0
            continue;
535
0
        }
536
537
17.6k
        if (rowset_meta->partition_id() == 0) {
538
0
            LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
539
0
                         << " rowset: " << rowset_meta->rowset_id()
540
0
                         << " txn: " << rowset_meta->txn_id();
541
0
            continue;
542
0
        }
543
544
17.6k
        RowsetSharedPtr rowset;
545
17.6k
        Status create_status = tablet->create_rowset(rowset_meta, &rowset);
546
17.6k
        if (!create_status) {
547
0
            LOG(WARNING) << "could not create rowset from rowsetmeta: "
548
0
                         << " rowset_id: " << rowset_meta->rowset_id()
549
0
                         << " rowset_type: " << rowset_meta->rowset_type()
550
0
                         << " rowset_state: " << rowset_meta->rowset_state();
551
0
            continue;
552
0
        }
553
17.6k
        if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
554
17.6k
            rowset_meta->tablet_uid() == tablet->tablet_uid()) {
555
100
            if (!rowset_meta->tablet_schema()) {
556
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
557
0
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
558
0
                                                        rowset_meta->rowset_id(),
559
0
                                                        rowset_meta->get_rowset_pb(), false));
560
0
            }
561
100
            Status commit_txn_status = _engine.txn_manager()->commit_txn(
562
100
                    _meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
563
100
                    rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
564
100
                    rowset, _engine.pending_local_rowsets().add(rowset_meta->rowset_id()), true);
565
100
            if (commit_txn_status || commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
566
100
                LOG(INFO) << "successfully to add committed rowset: " << rowset_meta->rowset_id()
567
100
                          << " to tablet: " << rowset_meta->tablet_id()
568
100
                          << " schema hash: " << rowset_meta->tablet_schema_hash()
569
100
                          << " for txn: " << rowset_meta->txn_id();
570
571
100
            } else if (commit_txn_status.is<ErrorCode::INTERNAL_ERROR>()) {
572
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
573
0
                             << " to tablet: " << rowset_meta->tablet_id()
574
0
                             << " for txn: " << rowset_meta->txn_id()
575
0
                             << " error: " << commit_txn_status;
576
0
                return commit_txn_status;
577
0
            } else {
578
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
579
0
                             << " to tablet: " << rowset_meta->tablet_id()
580
0
                             << " for txn: " << rowset_meta->txn_id()
581
0
                             << " error: " << commit_txn_status;
582
0
            }
583
17.5k
        } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
584
17.5k
                   rowset_meta->tablet_uid() == tablet->tablet_uid()) {
585
17.5k
            if (!rowset_meta->tablet_schema()) {
586
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
587
0
                RETURN_IF_ERROR(RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(),
588
0
                                                        rowset_meta->rowset_id(),
589
0
                                                        rowset_meta->get_rowset_pb(), false));
590
0
            }
591
17.5k
            Status publish_status = tablet->add_rowset(rowset);
592
17.5k
            if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
593
0
                LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
594
0
                             << rowset->rowset_id() << " tablet id: " << rowset_meta->tablet_id()
595
0
                             << " txn id:" << rowset_meta->txn_id()
596
0
                             << " start_version: " << rowset_meta->version().first
597
0
                             << " end_version: " << rowset_meta->version().second;
598
0
            }
599
17.5k
        } else {
600
0
            LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()
601
0
                         << " with tablet id: " << rowset_meta->tablet_id()
602
0
                         << " tablet uid: " << rowset_meta->tablet_uid()
603
0
                         << " schema hash: " << rowset_meta->tablet_schema_hash()
604
0
                         << " txn: " << rowset_meta->txn_id()
605
0
                         << " current valid tablet uid: " << tablet->tablet_uid();
606
0
            ++invalid_rowset_counter;
607
0
        }
608
17.6k
    }
609
610
51
    int64_t dbm_cnt {0};
611
51
    int64_t unknown_dbm_cnt {0};
612
51
    auto load_delete_bitmap_func = [this, &dbm_cnt, &unknown_dbm_cnt](int64_t tablet_id,
613
51
                                                                      int64_t version,
614
516
                                                                      std::string_view val) {
615
516
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
616
516
        if (!tablet) {
617
0
            return true;
618
0
        }
619
516
        const auto& all_rowsets = tablet->tablet_meta()->all_rs_metas();
620
516
        RowsetIdUnorderedSet rowset_ids;
621
2.14k
        for (const auto& [_, rowset_meta] : all_rowsets) {
622
2.14k
            rowset_ids.insert(rowset_meta->rowset_id());
623
2.14k
        }
624
625
516
        DeleteBitmapPB delete_bitmap_pb;
626
516
        delete_bitmap_pb.ParseFromArray(val.data(), cast_set<int>(val.size()));
627
516
        int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
628
516
        int seg_ids_size = delete_bitmap_pb.segment_ids_size();
629
516
        int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
630
516
        CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);
631
632
1.81k
        for (int i = 0; i < rst_ids_size; ++i) {
633
1.30k
            RowsetId rst_id;
634
1.30k
            rst_id.init(delete_bitmap_pb.rowset_ids(i));
635
            // only process the rowset in _rs_metas
636
1.30k
            if (rowset_ids.find(rst_id) == rowset_ids.end()) {
637
0
                ++unknown_dbm_cnt;
638
0
                continue;
639
0
            }
640
1.30k
            ++dbm_cnt;
641
1.30k
            auto seg_id = delete_bitmap_pb.segment_ids(i);
642
1.30k
            auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
643
1.30k
                    {rst_id, seg_id, version});
644
            // This version of delete bitmap already exists
645
1.30k
            if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
646
0
                continue;
647
0
            }
648
1.30k
            auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
649
1.30k
            tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
650
1.30k
                    roaring::Roaring::read(bitmap);
651
1.30k
        }
652
516
        return true;
653
516
    };
654
51
    MonotonicStopWatch dbm_timer;
655
51
    dbm_timer.start();
656
51
    RETURN_IF_ERROR(TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func));
657
51
    dbm_timer.stop();
658
659
51
    LOG(INFO) << "load delete bitmap from meta finished, cost: "
660
51
              << dbm_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
661
662
    // At startup, we only count these invalid rowset, but do not actually delete it.
663
    // The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
664
    // which is cleaned up uniformly by the background cleanup thread.
665
51
    LOG(INFO) << "finish to load tablets from " << _path
666
51
              << ", total rowset meta: " << dir_rowset_metas.size()
667
51
              << ", invalid rowset num: " << invalid_rowset_counter
668
51
              << ", visible/stale rowsets' delete bitmap count: " << dbm_cnt
669
51
              << ", invalid rowsets' delete bitmap count: " << unknown_dbm_cnt;
670
671
51
    return Status::OK();
672
51
}
673
674
// gc unused local tablet dir
675
306k
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) {
676
306k
    if (_stop_bg_worker) {
677
0
        return;
678
0
    }
679
680
306k
    TTabletId tablet_id = -1;
681
306k
    TSchemaHash schema_hash = -1;
682
306k
    bool is_valid = TabletManager::get_tablet_id_and_schema_hash_from_path(
683
306k
            tablet_schema_hash_path, &tablet_id, &schema_hash);
684
306k
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
685
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
686
0
        return;
687
0
    }
688
689
306k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
690
306k
    if (!tablet || tablet->data_dir() != this) {
691
32.3k
        if (tablet) {
692
0
            LOG(INFO) << "The tablet in path " << tablet_schema_hash_path
693
0
                      << " is not same with the running one: " << tablet->tablet_path()
694
0
                      << ", might be the old tablet after migration, try to move it to trash";
695
0
        }
696
32.3k
        _engine.tablet_manager()->try_delete_unused_tablet_path(this, tablet_id, schema_hash,
697
32.3k
                                                                tablet_schema_hash_path, shard_id);
698
32.3k
        return;
699
32.3k
    }
700
701
273k
    _perform_rowset_gc(tablet_schema_hash_path);
702
273k
}
703
704
// gc unused local rowsets under tablet dir
705
273k
void DataDir::_perform_rowset_gc(const std::string& tablet_schema_hash_path) {
706
273k
    if (_stop_bg_worker) {
707
0
        return;
708
0
    }
709
710
273k
    TTabletId tablet_id = -1;
711
273k
    TSchemaHash schema_hash = -1;
712
273k
    bool is_valid = doris::TabletManager::get_tablet_id_and_schema_hash_from_path(
713
273k
            tablet_schema_hash_path, &tablet_id, &schema_hash);
714
273k
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
715
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
716
0
        return;
717
0
    }
718
719
273k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
720
273k
    if (!tablet) {
721
        // Could not found the tablet, maybe it's a dropped tablet, will be reclaimed
722
        // in the next time `_perform_path_gc_by_tablet`
723
0
        return;
724
0
    }
725
726
273k
    if (tablet->data_dir() != this) {
727
        // Current running tablet is not in same data_dir, maybe it's a tablet after migration,
728
        // will be reclaimed in the next time `_perform_path_gc_by_tablet`
729
0
        return;
730
0
    }
731
732
273k
    bool exists;
733
273k
    std::vector<io::FileInfo> files;
734
273k
    auto st = io::global_local_filesystem()->list(tablet_schema_hash_path, true, &files, &exists);
735
273k
    if (!st.ok()) [[unlikely]] {
736
0
        LOG(WARNING) << "[path gc] fail to list tablet path " << tablet_schema_hash_path << " : "
737
0
                     << st;
738
0
        return;
739
0
    }
740
741
    // Rowset files excluding pending rowsets
742
273k
    std::vector<std::pair<RowsetId, std::string /* filename */>> rowsets_not_pending;
743
273k
    for (auto&& file : files) {
744
69.0k
        auto rowset_id = extract_rowset_id(file.file_name);
745
69.0k
        if (rowset_id.hi == 0) {
746
184
            continue; // Not a rowset
747
184
        }
748
749
68.8k
        if (_engine.pending_local_rowsets().contains(rowset_id)) {
750
152
            continue; // Pending rowset file
751
152
        }
752
753
68.6k
        rowsets_not_pending.emplace_back(rowset_id, std::move(file.file_name));
754
68.6k
    }
755
756
273k
    RowsetIdUnorderedSet rowsets_in_version_map;
757
273k
    tablet->traverse_rowsets(
758
466k
            [&rowsets_in_version_map](auto& rs) { rowsets_in_version_map.insert(rs->rowset_id()); },
759
273k
            true);
760
761
273k
    DBUG_EXECUTE_IF("DataDir::_perform_rowset_gc.simulation.slow", {
762
273k
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
763
273k
        if (target_tablet_id == tablet_id) {
764
273k
            LOG(INFO) << "debug point wait tablet to remove rsmgr tabletId=" << tablet_id;
765
273k
            DBUG_BLOCK;
766
273k
        }
767
273k
    });
768
769
273k
    auto reclaim_rowset_file = [](const std::string& path) {
770
814
        auto st = io::global_local_filesystem()->delete_file(path);
771
814
        if (!st.ok()) [[unlikely]] {
772
0
            LOG(WARNING) << "[path gc] failed to delete garbage rowset file: " << st;
773
0
            return;
774
0
        }
775
814
        LOG(INFO) << "[path gc] delete garbage path: " << path; // Audit log
776
814
    };
777
778
273k
    auto should_reclaim = [&, this](const RowsetId& rowset_id) {
779
62.5k
        return !rowsets_in_version_map.contains(rowset_id) &&
780
62.5k
               !_engine.check_rowset_id_in_unused_rowsets(rowset_id) &&
781
62.5k
               RowsetMetaManager::exists(get_meta(), tablet->tablet_uid(), rowset_id)
782
956
                       .is<META_KEY_NOT_FOUND>();
783
62.5k
    };
784
785
    // rowset_id -> is_garbage
786
273k
    std::unordered_map<RowsetId, bool> checked_rowsets;
787
273k
    for (auto&& [rowset_id, filename] : rowsets_not_pending) {
788
68.6k
        if (_stop_bg_worker) {
789
0
            return;
790
0
        }
791
792
68.6k
        if (auto it = checked_rowsets.find(rowset_id); it != checked_rowsets.end()) {
793
5.98k
            if (it->second) { // Is checked garbage rowset
794
60
                reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
795
60
            }
796
5.98k
            continue;
797
5.98k
        }
798
799
62.6k
        if (should_reclaim(rowset_id)) {
800
834
            if (config::path_gc_check_step > 0 &&
801
834
                ++_path_gc_step % config::path_gc_check_step == 0) {
802
0
                std::this_thread::sleep_for(
803
0
                        std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
804
0
            }
805
834
            reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
806
834
            checked_rowsets.emplace(rowset_id, true);
807
61.8k
        } else {
808
61.8k
            checked_rowsets.emplace(rowset_id, false);
809
61.8k
        }
810
62.6k
    }
811
273k
}
812
813
11
void DataDir::perform_path_gc() {
814
11
    if (_stop_bg_worker) {
815
0
        return;
816
0
    }
817
818
11
    LOG(INFO) << "start to gc data dir " << _path;
819
11
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
820
11
    std::vector<io::FileInfo> shards;
821
11
    bool exists = true;
822
11
    const auto& fs = io::global_local_filesystem();
823
11
    auto st = fs->list(data_path, false, &shards, &exists);
824
11
    if (!st.ok()) [[unlikely]] {
825
0
        LOG(WARNING) << "failed to scan data dir: " << st;
826
0
        return;
827
0
    }
828
829
10.2k
    for (const auto& shard : shards) {
830
10.2k
        if (_stop_bg_worker) {
831
0
            break;
832
0
        }
833
834
10.2k
        if (shard.is_file) {
835
0
            continue;
836
0
        }
837
838
10.2k
        auto shard_path = fmt::format("{}/{}", data_path, shard.file_name);
839
10.2k
        std::vector<io::FileInfo> tablet_ids;
840
10.2k
        st = io::global_local_filesystem()->list(shard_path, false, &tablet_ids, &exists);
841
10.2k
        if (!st.ok()) [[unlikely]] {
842
0
            LOG(WARNING) << "fail to walk dir, shard_path=" << shard_path << " : " << st;
843
0
            continue;
844
0
        }
845
846
306k
        for (const auto& tablet_id : tablet_ids) {
847
306k
            if (_stop_bg_worker) {
848
0
                break;
849
0
            }
850
851
306k
            if (tablet_id.is_file) {
852
0
                continue;
853
0
            }
854
855
306k
            auto tablet_id_path = fmt::format("{}/{}", shard_path, tablet_id.file_name);
856
306k
            std::vector<io::FileInfo> schema_hashes;
857
306k
            st = fs->list(tablet_id_path, false, &schema_hashes, &exists);
858
306k
            if (!st.ok()) [[unlikely]] {
859
10
                LOG(WARNING) << "fail to walk dir, tablet_id_path=" << tablet_id_path << " : "
860
10
                             << st;
861
10
                continue;
862
10
            }
863
864
306k
            for (auto&& schema_hash : schema_hashes) {
865
306k
                if (schema_hash.is_file) {
866
0
                    continue;
867
0
                }
868
869
306k
                if (config::path_gc_check_step > 0 &&
870
306k
                    ++_path_gc_step % config::path_gc_check_step == 0) {
871
304
                    std::this_thread::sleep_for(
872
304
                            std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
873
304
                }
874
306k
                int16_t shard_id = -1;
875
306k
                try {
876
306k
                    shard_id = cast_set<int16_t>(std::stoi(shard.file_name));
877
306k
                } catch (const std::exception&) {
878
0
                    LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name;
879
0
                    continue;
880
0
                }
881
306k
                _perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id);
882
306k
            }
883
306k
        }
884
10.2k
    }
885
886
11
    LOG(INFO) << "gc data dir path: " << _path << " finished";
887
11
}
888
889
566
Status DataDir::update_capacity() {
890
566
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
891
566
                                                                  &_available_bytes));
892
555
    disks_total_capacity->set_value(_disk_capacity_bytes);
893
555
    disks_avail_capacity->set_value(_available_bytes);
894
555
    LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
895
555
              << ", available capacity: " << _available_bytes << ", usage: " << get_usage(0)
896
555
              << ", in_use: " << is_used();
897
898
555
    return Status::OK();
899
566
}
900
901
48
void DataDir::update_trash_capacity() {
902
48
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
903
48
    try {
904
48
        _trash_used_bytes = _engine.get_file_or_directory_size(trash_path);
905
48
    } catch (const std::filesystem::filesystem_error& e) {
906
0
        LOG(WARNING) << "update trash capacity failed, path: " << _path << ", err: " << e.what();
907
0
        return;
908
0
    }
909
48
    disks_trash_used_capacity->set_value(_trash_used_bytes);
910
48
    LOG(INFO) << "path: " << _path << " trash capacity: " << _trash_used_bytes;
911
48
}
912
913
347
void DataDir::update_local_data_size(int64_t size) {
914
347
    disks_local_used_capacity->set_value(size);
915
347
}
916
917
347
void DataDir::update_remote_data_size(int64_t size) {
918
347
    disks_remote_used_capacity->set_value(size);
919
347
}
920
921
0
size_t DataDir::tablet_size() const {
922
0
    std::lock_guard<std::mutex> l(_mutex);
923
0
    return _tablet_set.size();
924
0
}
925
926
25.5k
bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
927
25.5k
    double used_pct = get_usage(incoming_data_size);
928
25.5k
    int64_t left_bytes = _available_bytes - incoming_data_size;
929
25.5k
    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
930
25.5k
        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
931
0
        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
932
0
                     << ", left bytes: " << left_bytes << ", path: " << _path;
933
0
        return true;
934
0
    }
935
25.5k
    return false;
936
25.5k
}
937
938
2.72k
void DataDir::disks_compaction_score_increment(int64_t delta) {
939
2.72k
    disks_compaction_score->increment(delta);
940
2.72k
}
941
942
2.72k
void DataDir::disks_compaction_num_increment(int64_t delta) {
943
2.72k
    disks_compaction_num->increment(delta);
944
2.72k
}
945
946
41.8k
Status DataDir::move_to_trash(const std::string& tablet_path) {
947
41.8k
    if (config::trash_file_expire_time_sec <= 0) {
948
41.8k
        LOG(INFO) << "delete tablet dir " << tablet_path
949
41.8k
                  << " directly due to trash_file_expire_time_sec is 0";
950
41.8k
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
951
41.8k
        return delete_tablet_parent_path_if_empty(tablet_path);
952
41.8k
    }
953
954
0
    Status res = Status::OK();
955
    // 1. get timestamp string
956
0
    std::string time_str;
957
0
    if ((res = gen_timestamp_string(&time_str)) != Status::OK()) {
958
0
        LOG(WARNING) << "failed to generate time_string when move file to trash.err code=" << res;
959
0
        return res;
960
0
    }
961
962
    // 2. generate new file path
963
    // a global counter to avoid file name duplication.
964
0
    static std::atomic<uint64_t> delete_counter(0);
965
0
    auto trash_root_path =
966
0
            fmt::format("{}/{}/{}.{}", _path, TRASH_PREFIX, time_str, delete_counter++);
967
0
    auto fs_tablet_path = io::Path(tablet_path);
968
0
    auto trash_tablet_path = trash_root_path /
969
0
                             fs_tablet_path.parent_path().filename() /* tablet_id */ /
970
0
                             fs_tablet_path.filename() /* schema_hash */;
971
972
    // 3. create target dir, or the rename() function will fail.
973
0
    auto trash_tablet_parent = trash_tablet_path.parent_path();
974
    // create dir if not exists
975
0
    bool exists = true;
976
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(trash_tablet_parent, &exists));
977
0
    if (!exists) {
978
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(trash_tablet_parent));
979
0
    }
980
981
    // 4. move tablet to trash
982
0
    VLOG_NOTICE << "move file to trash. " << tablet_path << " -> " << trash_tablet_path;
983
0
    if (rename(tablet_path.c_str(), trash_tablet_path.c_str()) < 0) {
984
0
        return Status::Error<OS_ERROR>("move file to trash failed. file={}, target={}, err={}",
985
0
                                       tablet_path, trash_tablet_path.native(), Errno::str());
986
0
    }
987
988
    // 5. check parent dir of source file, delete it when empty
989
0
    RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path));
990
991
0
    return Status::OK();
992
0
}
993
994
41.8k
Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) {
995
41.8k
    auto fs_tablet_path = io::Path(tablet_path);
996
41.8k
    std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level
997
41.8k
    std::vector<io::FileInfo> sub_files;
998
41.8k
    bool exists = true;
999
41.8k
    RETURN_IF_ERROR(
1000
41.8k
            io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists));
1001
41.8k
    if (sub_files.empty()) {
1002
41.8k
        LOG(INFO) << "remove empty dir " << source_parent_dir;
1003
        // no need to exam return status
1004
41.8k
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(source_parent_dir));
1005
41.8k
    }
1006
41.8k
    return Status::OK();
1007
41.8k
}
1008
1009
48
void DataDir::perform_remote_rowset_gc() {
1010
48
    std::vector<std::pair<std::string, std::string>> gc_kvs;
1011
48
    auto traverse_remote_rowset_func = [&gc_kvs](std::string_view key,
1012
48
                                                 std::string_view value) -> bool {
1013
0
        gc_kvs.emplace_back(key, value);
1014
0
        return true;
1015
0
    };
1016
48
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX,
1017
48
                                     traverse_remote_rowset_func));
1018
48
    std::vector<std::string> deleted_keys;
1019
48
    for (auto& [key, val] : gc_kvs) {
1020
0
        auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
1021
0
        RemoteRowsetGcPB gc_pb;
1022
0
        if (!gc_pb.ParseFromString(val)) {
1023
0
            LOG(WARNING) << "malformed RemoteRowsetGcPB. rowset_id=" << rowset_id;
1024
0
            deleted_keys.push_back(std::move(key));
1025
0
            continue;
1026
0
        }
1027
1028
0
        auto storage_resource = get_storage_resource(gc_pb.resource_id());
1029
0
        if (!storage_resource) {
1030
0
            LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
1031
0
            continue;
1032
0
        }
1033
1034
0
        std::vector<io::Path> seg_paths;
1035
0
        seg_paths.reserve(gc_pb.num_segments());
1036
0
        for (int i = 0; i < gc_pb.num_segments(); ++i) {
1037
0
            seg_paths.emplace_back(
1038
0
                    storage_resource->first.remote_segment_path(gc_pb.tablet_id(), rowset_id, i));
1039
0
        }
1040
1041
0
        auto& fs = storage_resource->first.fs;
1042
0
        LOG(INFO) << "delete remote rowset. root_path=" << fs->root_path()
1043
0
                  << ", rowset_id=" << rowset_id;
1044
0
        auto st = fs->batch_delete(seg_paths);
1045
0
        if (st.ok()) {
1046
0
            deleted_keys.push_back(std::move(key));
1047
0
            unused_remote_rowset_num << -1;
1048
0
        } else {
1049
0
            LOG(WARNING) << "failed to delete remote rowset. err=" << st;
1050
0
        }
1051
0
    }
1052
48
    for (const auto& key : deleted_keys) {
1053
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1054
0
    }
1055
48
}
1056
1057
48
void DataDir::perform_remote_tablet_gc() {
1058
48
    std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
1059
48
    auto traverse_remote_tablet_func = [&tablet_gc_kvs](std::string_view key,
1060
48
                                                        std::string_view value) -> bool {
1061
0
        tablet_gc_kvs.emplace_back(key, value);
1062
0
        return true;
1063
0
    };
1064
48
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX,
1065
48
                                     traverse_remote_tablet_func));
1066
48
    std::vector<std::string> deleted_keys;
1067
48
    for (auto& [key, val] : tablet_gc_kvs) {
1068
0
        auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
1069
0
        RemoteTabletGcPB gc_pb;
1070
0
        if (!gc_pb.ParseFromString(val)) {
1071
0
            LOG(WARNING) << "malformed RemoteTabletGcPB. tablet_id=" << tablet_id;
1072
0
            deleted_keys.push_back(std::move(key));
1073
0
            continue;
1074
0
        }
1075
0
        bool success = true;
1076
0
        for (auto& resource_id : gc_pb.resource_ids()) {
1077
0
            auto fs = get_filesystem(resource_id);
1078
0
            if (!fs) {
1079
0
                LOG(WARNING) << "could not get file system. resource_id=" << resource_id;
1080
0
                success = false;
1081
0
                continue;
1082
0
            }
1083
0
            LOG(INFO) << "delete remote rowsets of tablet. root_path=" << fs->root_path()
1084
0
                      << ", tablet_id=" << tablet_id;
1085
0
            auto st = fs->delete_directory(DATA_PREFIX + '/' + tablet_id);
1086
0
            if (!st.ok()) {
1087
0
                LOG(WARNING) << "failed to delete all remote rowset in tablet. err=" << st;
1088
0
                success = false;
1089
0
            }
1090
0
        }
1091
0
        if (success) {
1092
0
            deleted_keys.push_back(std::move(key));
1093
0
        }
1094
0
    }
1095
48
    for (const auto& key : deleted_keys) {
1096
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1097
0
    }
1098
48
}
1099
#include "common/compile_check_end.h"
1100
} // namespace doris