Coverage Report

Created: 2026-05-16 12:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/data_dir.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/data_dir.h"
19
20
#include <fmt/core.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <atomic>
27
#include <cstdio>
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <chrono> // IWYU pragma: keep
30
#include <cstddef>
31
#include <filesystem>
32
#include <map>
33
#include <memory>
34
#include <new>
35
#include <optional>
36
#include <roaring/roaring.hh>
37
#include <set>
38
#include <sstream>
39
#include <string>
40
#include <thread>
41
#include <utility>
42
43
#include "common/cast_set.h"
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/metrics/doris_metrics.h"
47
#include "io/fs/file_reader.h"
48
#include "io/fs/file_writer.h"
49
#include "io/fs/local_file_system.h"
50
#include "io/fs/path.h"
51
#include "service/backend_options.h"
52
#include "storage/delete/delete_handler.h"
53
#include "storage/olap_common.h"
54
#include "storage/olap_define.h"
55
#include "storage/olap_meta.h"
56
#include "storage/rowset/beta_rowset.h"
57
#include "storage/rowset/pending_rowset_helper.h"
58
#include "storage/rowset/rowset.h"
59
#include "storage/rowset/rowset_id_generator.h"
60
#include "storage/rowset/rowset_meta.h"
61
#include "storage/rowset/rowset_meta_manager.h"
62
#include "storage/storage_engine.h"
63
#include "storage/storage_policy.h"
64
#include "storage/tablet/tablet.h"
65
#include "storage/tablet/tablet_manager.h"
66
#include "storage/tablet/tablet_meta_manager.h"
67
#include "storage/txn/txn_manager.h"
68
#include "storage/utils.h" // for check_dir_existed
69
#include "util/string_util.h"
70
#include "util/uid_util.h"
71
72
namespace doris {
73
using namespace ErrorCode;
74
75
namespace {
76
77
115
Status read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id) {
78
115
    bool exists = false;
79
115
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
80
115
    *cluster_id = -1;
81
115
    if (exists) {
82
8
        io::FileReaderSPtr reader;
83
8
        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, &reader));
84
8
        size_t fsize = reader->size();
85
8
        if (fsize > 0) {
86
8
            std::string content;
87
8
            content.resize(fsize, '\0');
88
8
            size_t bytes_read = 0;
89
8
            RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, &bytes_read));
90
8
            DCHECK_EQ(fsize, bytes_read);
91
8
            *cluster_id = std::stoi(content);
92
8
        }
93
8
    }
94
115
    return Status::OK();
95
115
}
96
97
2
Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id) {
98
2
    bool exists = false;
99
2
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(path, &exists));
100
2
    if (!exists) {
101
2
        io::FileWriterPtr file_writer;
102
2
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &file_writer));
103
2
        RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
104
2
        RETURN_IF_ERROR(file_writer->close());
105
2
    }
106
2
    return Status::OK();
107
2
}
108
109
} // namespace
110
111
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_total_capacity, MetricUnit::BYTES);
112
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_avail_capacity, MetricUnit::BYTES);
113
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_local_used_capacity, MetricUnit::BYTES);
114
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_remote_used_capacity, MetricUnit::BYTES);
115
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_trash_used_capacity, MetricUnit::BYTES);
116
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_state, MetricUnit::BYTES);
117
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_score, MetricUnit::NOUNIT);
118
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(disks_compaction_num, MetricUnit::NOUNIT);
119
120
DataDir::DataDir(StorageEngine& engine, const std::string& path, int64_t capacity_bytes,
121
                 TStorageMedium::type storage_medium)
122
272
        : _engine(engine),
123
272
          _path(path),
124
272
          _available_bytes(0),
125
272
          _disk_capacity_bytes(0),
126
272
          _trash_used_bytes(0),
127
272
          _storage_medium(storage_medium),
128
272
          _is_used(false),
129
272
          _cluster_id(-1),
130
272
          _to_be_deleted(false) {
131
272
    _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
132
272
            std::string("data_dir.") + path, {{"path", path}});
133
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity);
134
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
135
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_local_used_capacity);
136
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_remote_used_capacity);
137
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_trash_used_capacity);
138
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
139
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
140
272
    INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
141
272
}
142
143
267
DataDir::~DataDir() {
144
267
    DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity);
145
267
    delete _meta;
146
267
}
147
148
115
Status DataDir::init(bool init_meta) {
149
115
    bool exists = false;
150
115
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
151
115
    if (!exists) {
152
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
153
0
                                       "check file exist failed");
154
0
    }
155
156
115
    RETURN_NOT_OK_STATUS_WITH_WARN(update_capacity(), "update_capacity failed");
157
115
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
158
115
    RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity_and_create_shards(),
159
115
                                   "_init_capacity_and_create_shards failed");
160
115
    if (init_meta) {
161
115
        RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
162
115
    }
163
164
115
    _is_used = true;
165
115
    return Status::OK();
166
115
}
167
168
55
void DataDir::stop_bg_worker() {
169
55
    _stop_bg_worker = true;
170
55
}
171
172
115
Status DataDir::_init_cluster_id() {
173
115
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
174
115
    RETURN_IF_ERROR(read_cluster_id(cluster_id_path, &_cluster_id));
175
115
    if (_cluster_id == -1) {
176
107
        _cluster_id_incomplete = true;
177
107
    }
178
115
    return Status::OK();
179
115
}
180
181
115
Status DataDir::_init_capacity_and_create_shards() {
182
115
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
183
115
                                                                  &_available_bytes));
184
115
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
185
115
    bool exists = false;
186
115
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(data_path, &exists));
187
115
    if (!exists) {
188
107
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(data_path));
189
107
    }
190
117k
    for (int i = 0; i < MAX_SHARD_NUM; ++i) {
191
117k
        auto shard_path = fmt::format("{}/{}", data_path, i);
192
117k
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(shard_path, &exists));
193
117k
        if (!exists) {
194
109k
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path));
195
109k
        }
196
117k
    }
197
198
115
    return Status::OK();
199
115
}
200
201
116
Status DataDir::_init_meta() {
202
    // init path hash
203
116
    _path_hash = hash_of_path(BackendOptions::get_localhost(), _path);
204
116
    LOG(INFO) << "path: " << _path << ", hash: " << _path_hash;
205
206
    // init meta
207
116
    _meta = new (std::nothrow) OlapMeta(_path);
208
116
    if (_meta == nullptr) {
209
0
        RETURN_NOT_OK_STATUS_WITH_WARN(
210
0
                Status::MemoryAllocFailed("allocate memory for OlapMeta failed"),
211
0
                "new OlapMeta failed");
212
0
    }
213
116
    Status res = _meta->init();
214
116
    if (!res.ok()) {
215
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("open rocksdb failed, path={}", _path),
216
0
                                       "init OlapMeta failed");
217
0
    }
218
116
    return Status::OK();
219
116
}
220
221
2
Status DataDir::set_cluster_id(int32_t cluster_id) {
222
2
    if (_cluster_id != -1 && _cluster_id != cluster_id) {
223
0
        LOG(ERROR) << "going to set cluster id to already assigned store, cluster_id="
224
0
                   << _cluster_id << ", new_cluster_id=" << cluster_id;
225
0
        return Status::InternalError("going to set cluster id to already assigned store");
226
0
    }
227
2
    if (!_cluster_id_incomplete) {
228
0
        return Status::OK();
229
0
    }
230
2
    auto cluster_id_path = fmt::format("{}/{}", _path, CLUSTER_ID_PREFIX);
231
2
    return _write_cluster_id_to_path(cluster_id_path, cluster_id);
232
2
}
233
234
1.78k
void DataDir::health_check() {
235
    // check disk
236
1.78k
    if (_is_used) {
237
1.78k
        Status res = _read_and_write_test_file();
238
1.78k
        if (!res && res.is<IO_ERROR>()) {
239
0
            LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path
240
0
                         << ", err: " << res;
241
0
            _engine.add_broken_path(_path);
242
0
            _is_used = !res.is<IO_ERROR>();
243
0
        }
244
1.78k
    }
245
1.78k
    disks_state->set_value(_is_used ? 1 : 0);
246
1.78k
}
247
248
1.78k
Status DataDir::_read_and_write_test_file() {
249
1.78k
    auto test_file = fmt::format("{}/{}", _path, kTestFilePath);
250
1.78k
    return read_write_test_file(test_file);
251
1.78k
}
252
253
285k
void DataDir::register_tablet(Tablet* tablet) {
254
285k
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
255
256
285k
    std::lock_guard<std::mutex> l(_mutex);
257
285k
    _tablet_set.emplace(std::move(tablet_info));
258
285k
}
259
260
5.08k
void DataDir::deregister_tablet(Tablet* tablet) {
261
5.08k
    TabletInfo tablet_info(tablet->tablet_id(), tablet->tablet_uid());
262
263
5.08k
    std::lock_guard<std::mutex> l(_mutex);
264
5.08k
    _tablet_set.erase(tablet_info);
265
5.08k
}
266
267
0
void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) {
268
0
    std::lock_guard<std::mutex> l(_mutex);
269
270
0
    tablet_infos->insert(tablet_infos->end(), _tablet_set.begin(), _tablet_set.end());
271
0
    _tablet_set.clear();
272
0
}
273
274
0
std::string DataDir::get_absolute_shard_path(int64_t shard_id) {
275
0
    return fmt::format("{}/{}/{}", _path, DATA_PREFIX, shard_id);
276
0
}
277
278
std::string DataDir::get_absolute_tablet_path(int64_t shard_id, int64_t tablet_id,
279
0
                                              int32_t schema_hash) {
280
0
    return fmt::format("{}/{}/{}", get_absolute_shard_path(shard_id), tablet_id, schema_hash);
281
0
}
282
283
0
void DataDir::find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* paths) {
284
    // path: /root_path/trash/time_label/tablet_id/schema_hash
285
0
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
286
0
    bool exists = true;
287
0
    std::vector<io::FileInfo> sub_dirs;
288
0
    Status st = io::global_local_filesystem()->list(trash_path, false, &sub_dirs, &exists);
289
0
    if (!st) {
290
0
        return;
291
0
    }
292
293
0
    for (auto& sub_dir : sub_dirs) {
294
        // sub dir is time_label
295
0
        if (sub_dir.is_file) {
296
0
            continue;
297
0
        }
298
0
        auto sub_path = fmt::format("{}/{}", trash_path, sub_dir.file_name);
299
0
        auto tablet_path = fmt::format("{}/{}", sub_path, tablet_id);
300
0
        st = io::global_local_filesystem()->exists(tablet_path, &exists);
301
0
        if (st && exists) {
302
0
            paths->emplace_back(std::move(tablet_path));
303
0
        }
304
0
    }
305
0
}
306
307
std::string DataDir::get_root_path_from_schema_hash_path_in_trash(
308
0
        const std::string& schema_hash_dir_in_trash) {
309
0
    return io::Path(schema_hash_dir_in_trash)
310
0
            .parent_path()
311
0
            .parent_path()
312
0
            .parent_path()
313
0
            .parent_path()
314
0
            .string();
315
0
}
316
317
60
Status DataDir::_check_incompatible_old_format_tablet() {
318
60
    auto check_incompatible_old_func = [](int64_t tablet_id, int32_t schema_hash,
319
60
                                          std::string_view value) -> bool {
320
        // if strict check incompatible old format, then log fatal
321
0
        if (config::storage_strict_check_incompatible_old_format) {
322
0
            throw Exception(Status::FatalError(
323
0
                    "There are incompatible old format metas, current version does not support and "
324
0
                    "it may lead to data missing!!! tablet_id = {} schema_hash = {}",
325
0
                    tablet_id, schema_hash));
326
0
        } else {
327
0
            LOG(WARNING)
328
0
                    << "There are incompatible old format metas, current version does not support "
329
0
                    << "and it may lead to data missing!!! "
330
0
                    << "tablet_id = " << tablet_id << " schema_hash = " << schema_hash;
331
0
        }
332
0
        return false;
333
0
    };
334
335
    // seek old header prefix. when check_incompatible_old_func is called, it has old format in olap_meta
336
60
    Status check_incompatible_old_status = TabletMetaManager::traverse_headers(
337
60
            _meta, check_incompatible_old_func, OLD_HEADER_PREFIX);
338
60
    if (!check_incompatible_old_status) {
339
0
        LOG(WARNING) << "check incompatible old format meta fails, it may lead to data missing!!! "
340
0
                     << _path;
341
60
    } else {
342
60
        LOG(INFO) << "successfully check incompatible old format meta " << _path;
343
60
    }
344
60
    return check_incompatible_old_status;
345
60
}
346
347
// TODO(ygl): deal with rowsets and tablets when load failed
348
60
Status DataDir::load() {
349
60
    LOG(INFO) << "start to load tablets from " << _path;
350
351
    // load rowset meta from meta env and create rowset
352
    // COMMITTED: add to txn manager
353
    // VISIBLE: add to tablet
354
    // if one rowset load failed, then the total data dir will not be loaded
355
356
    // necessarily check incompatible old format. when there are old metas, it may load to data missing
357
60
    RETURN_IF_ERROR(_check_incompatible_old_format_tablet());
358
359
60
    std::vector<RowsetMetaSharedPtr> dir_rowset_metas;
360
60
    LOG(INFO) << "begin loading rowset from meta";
361
60
    auto load_rowset_func = [&dir_rowset_metas, this](TabletUid tablet_uid, RowsetId rowset_id,
362
2.64k
                                                      std::string_view meta_str) -> bool {
363
2.64k
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
364
2.64k
        bool parsed = rowset_meta->init(meta_str);
365
2.64k
        if (!parsed) {
366
0
            LOG(WARNING) << "parse rowset meta string failed for rowset_id:" << rowset_id;
367
            // return false will break meta iterator, return true to skip this error
368
0
            return true;
369
0
        }
370
371
2.64k
        if (rowset_meta->has_delete_predicate()) {
372
            // copy the delete sub pred v1 to check then
373
100
            auto orig_delete_sub_pred = rowset_meta->delete_predicate().sub_predicates();
374
100
            auto* delete_pred = rowset_meta->mutable_delete_pred_pb();
375
376
100
            if ((!delete_pred->sub_predicates().empty() &&
377
100
                 delete_pred->sub_predicates_v2().empty()) ||
378
100
                (!delete_pred->in_predicates().empty() &&
379
100
                 delete_pred->in_predicates()[0].has_column_unique_id())) {
380
                // convert pred and write only when delete sub pred v2 is not set or there is in list pred to be set column uid
381
32
                RETURN_IF_ERROR(DeleteHandler::convert_to_sub_pred_v2(
382
32
                        delete_pred, rowset_meta->tablet_schema()));
383
32
                LOG(INFO) << fmt::format(
384
32
                        "convert rowset with old delete pred: rowset_id={}, tablet_id={}",
385
32
                        rowset_id.to_string(), tablet_uid.to_string());
386
32
                CHECK_EQ(orig_delete_sub_pred.size(), delete_pred->sub_predicates().size())
387
0
                        << "inconsistent sub predicate v1 after conversion";
388
32
                for (int i = 0; i < orig_delete_sub_pred.size(); ++i) {
389
0
                    CHECK_STREQ(orig_delete_sub_pred.Get(i).c_str(),
390
0
                                delete_pred->sub_predicates().Get(i).c_str())
391
0
                            << "inconsistent sub predicate v1 after conversion";
392
0
                }
393
32
                std::string result;
394
32
                rowset_meta->serialize(&result);
395
32
                std::string key =
396
32
                        ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
397
32
                RETURN_IF_ERROR(_meta->put(META_COLUMN_FAMILY_INDEX, key, result));
398
32
            }
399
100
        }
400
401
2.64k
        if (rowset_meta->partition_id() == 0) {
402
0
            LOG(WARNING) << "rs tablet=" << rowset_meta->tablet_id() << " rowset_id=" << rowset_id
403
0
                         << " load from meta but partition id eq 0";
404
0
        }
405
406
2.64k
        dir_rowset_metas.push_back(rowset_meta);
407
2.64k
        return true;
408
2.64k
    };
409
60
    MonotonicStopWatch rs_timer;
410
60
    rs_timer.start();
411
60
    Status load_rowset_status = RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
412
60
    rs_timer.stop();
413
60
    if (!load_rowset_status) {
414
0
        LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
415
60
    } else {
416
60
        LOG(INFO) << "load rowset from meta finished, cost: "
417
60
                  << rs_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
418
60
    }
419
420
    // load tablet
421
    // create tablet from tablet meta and add it to tablet mgr
422
60
    LOG(INFO) << "begin loading tablet from meta";
423
60
    std::set<int64_t> tablet_ids;
424
60
    std::set<int64_t> failed_tablet_ids;
425
60
    auto load_tablet_func = [this, &tablet_ids, &failed_tablet_ids](
426
60
                                    int64_t tablet_id, int32_t schema_hash,
427
278k
                                    std::string_view value) -> bool {
428
278k
        Status status = _engine.tablet_manager()->load_tablet_from_meta(
429
278k
                this, tablet_id, schema_hash, value, false, false, false, false);
430
278k
        if (!status.ok() && !status.is<TABLE_ALREADY_DELETED_ERROR>() &&
431
278k
            !status.is<ENGINE_INSERT_OLD_TABLET>()) {
432
            // load_tablet_from_meta() may return Status::Error<TABLE_ALREADY_DELETED_ERROR>()
433
            // which means the tablet status is DELETED
434
            // This may happen when the tablet was just deleted before the BE restarted,
435
            // but it has not been cleared from rocksdb. At this time, restarting the BE
436
            // will read the tablet in the DELETE state from rocksdb. These tablets have been
437
            // added to the garbage collection queue and will be automatically deleted afterwards.
438
            // Therefore, we believe that this situation is not a failure.
439
440
            // Besides, load_tablet_from_meta() may return Status::Error<ENGINE_INSERT_OLD_TABLET>()
441
            // when BE is restarting and the older tablet have been added to the
442
            // garbage collection queue but not deleted yet.
443
            // In this case, since the data_dirs are parallel loaded, a later loaded tablet
444
            // may be older than previously loaded one, which should not be acknowledged as a
445
            // failure.
446
0
            LOG(WARNING) << "load tablet from header failed. status:" << status
447
0
                         << ", tablet=" << tablet_id << "." << schema_hash;
448
0
            failed_tablet_ids.insert(tablet_id);
449
278k
        } else {
450
278k
            tablet_ids.insert(tablet_id);
451
278k
        }
452
278k
        return true;
453
278k
    };
454
60
    MonotonicStopWatch tablet_timer;
455
60
    tablet_timer.start();
456
60
    Status load_tablet_status = TabletMetaManager::traverse_headers(_meta, load_tablet_func);
457
60
    tablet_timer.stop();
458
60
    if (!failed_tablet_ids.empty()) {
459
0
        LOG(WARNING) << "load tablets from header failed"
460
0
                     << ", loaded tablet: " << tablet_ids.size()
461
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
462
0
        if (!config::ignore_load_tablet_failure) {
463
0
            throw Exception(Status::FatalError(
464
0
                    "load tablets encounter failure. stop BE process. path: {}", _path));
465
0
        }
466
0
    }
467
60
    if (!load_tablet_status) {
468
0
        LOG(WARNING) << "there is failure when loading tablet headers"
469
0
                     << ", loaded tablet: " << tablet_ids.size()
470
0
                     << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
471
60
    } else {
472
60
        LOG(INFO) << "load tablet from meta finished"
473
60
                  << ", loaded tablet: " << tablet_ids.size()
474
60
                  << ", error tablet: " << failed_tablet_ids.size()
475
60
                  << ", cost: " << tablet_timer.elapsed_time_milliseconds()
476
60
                  << " ms, path: " << _path;
477
60
    }
478
479
278k
    for (int64_t tablet_id : tablet_ids) {
480
278k
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
481
278k
        if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
482
0
            RETURN_IF_ERROR(TabletMetaManager::save(this, tablet->tablet_id(),
483
0
                                                    tablet->schema_hash(), tablet->tablet_meta()));
484
0
        }
485
278k
    }
486
487
60
    auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id,
488
60
                                                              int64_t publish_version,
489
60
                                                              std::string_view info) {
490
0
        PendingPublishInfoPB pending_publish_info_pb;
491
0
        bool parsed =
492
0
                pending_publish_info_pb.ParseFromArray(info.data(), cast_set<int>(info.size()));
493
0
        if (!parsed) {
494
0
            LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
495
0
                         << " publish_version: " << publish_version;
496
0
        }
497
0
        engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
498
0
                                      publish_version, pending_publish_info_pb.transaction_id(),
499
0
                                      true, pending_publish_info_pb.commit_tso());
500
0
        return true;
501
0
    };
502
60
    MonotonicStopWatch pending_publish_timer;
503
60
    pending_publish_timer.start();
504
60
    RETURN_IF_ERROR(
505
60
            TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func));
506
60
    pending_publish_timer.stop();
507
60
    LOG(INFO) << "load pending publish task from meta finished, cost: "
508
60
              << pending_publish_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
509
510
60
    int64_t rowset_partition_id_eq_0_num = 0;
511
2.64k
    for (auto rowset_meta : dir_rowset_metas) {
512
2.64k
        if (rowset_meta->partition_id() == 0) {
513
0
            ++rowset_partition_id_eq_0_num;
514
0
        }
515
2.64k
    }
516
60
    if (rowset_partition_id_eq_0_num > config::ignore_invalid_partition_id_rowset_num) {
517
0
        throw Exception(Status::FatalError(
518
0
                "rowset partition id eq 0 is {} bigger than config {}, be exit, plz check be.INFO",
519
0
                rowset_partition_id_eq_0_num, config::ignore_invalid_partition_id_rowset_num));
520
0
    }
521
522
60
    std::map<RowsetId, std::vector<RowsetMetaSharedPtr>> rowset_id_to_row_binlog_metas;
523
60
    int64_t row_binlog_cnt {0};
524
60
    int64_t invalid_row_binlog_cnt {0};
525
60
    auto load_row_binlog_meta_func =
526
60
            [&rowset_id_to_row_binlog_metas, &row_binlog_cnt, &invalid_row_binlog_cnt](
527
60
                    const TabletUid& tablet_uid, const RowsetId& rowset_id,
528
64
                    const RowsetId& row_binlog_rowset_id, const std::string& val) -> bool {
529
64
        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
530
64
        bool parsed = rowset_meta->init(val);
531
64
        if (!parsed) {
532
0
            LOG(WARNING) << "parse binlog<row> meta string failed, tablet_uid=" << tablet_uid
533
0
                         << ", rowset_id=" << rowset_id
534
0
                         << ", row_binlog_rowset_id=" << row_binlog_rowset_id;
535
0
            ++invalid_row_binlog_cnt;
536
0
            return true;
537
0
        }
538
64
        DCHECK(rowset_meta->is_row_binlog());
539
64
        DCHECK_EQ(rowset_meta->tablet_uid(), tablet_uid);
540
64
        rowset_id_to_row_binlog_metas[rowset_id].emplace_back(std::move(rowset_meta));
541
64
        ++row_binlog_cnt;
542
64
        return true;
543
64
    };
544
545
60
    MonotonicStopWatch rb_timer;
546
60
    rb_timer.start();
547
60
    RETURN_IF_ERROR(RowsetMetaManager::traverse_row_binlog_metas(_meta, load_row_binlog_meta_func));
548
60
    rb_timer.stop();
549
60
    LOG(INFO) << "load binlog<row> meta finished, cost: " << rb_timer.elapsed_time_milliseconds()
550
60
              << " ms, data dir: " << _path << ", count: " << row_binlog_cnt
551
60
              << ", invalid: " << invalid_row_binlog_cnt;
552
553
    // traverse rowset
554
    // 1. add committed rowset to txn map
555
    // 2. add visible rowset to tablet
556
    // ignore any errors when load tablet or rowset, because fe will repair them after report
557
60
    int64_t invalid_rowset_counter = 0;
558
2.64k
    for (auto&& rowset_meta : dir_rowset_metas) {
559
2.64k
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(rowset_meta->tablet_id());
560
        // tablet maybe dropped, but not drop related rowset meta
561
2.64k
        if (tablet == nullptr) {
562
20
            VLOG_NOTICE << "could not find tablet id: " << rowset_meta->tablet_id()
563
0
                        << ", schema hash: " << rowset_meta->tablet_schema_hash()
564
0
                        << ", for rowset: " << rowset_meta->rowset_id() << ", skip this rowset";
565
20
            ++invalid_rowset_counter;
566
20
            continue;
567
20
        }
568
569
2.62k
        if (rowset_meta->partition_id() == 0) {
570
0
            LOG(WARNING) << "skip tablet_id=" << tablet->tablet_id()
571
0
                         << " rowset: " << rowset_meta->rowset_id()
572
0
                         << " txn: " << rowset_meta->txn_id();
573
0
            continue;
574
0
        }
575
576
2.62k
        std::vector<RowsetSharedPtr> attach_rowsets;
577
2.62k
        std::map<RowsetId, RowsetMetaPB> attach_rowset_map;
578
2.62k
        bool invalid_attach_rowset = false;
579
2.62k
        if (auto it = rowset_id_to_row_binlog_metas.find(rowset_meta->rowset_id());
580
2.62k
            it != rowset_id_to_row_binlog_metas.end()) {
581
64
            for (auto& attach_rowset_meta : it->second) {
582
64
                DCHECK_EQ(attach_rowset_meta->rowset_state(), rowset_meta->rowset_state());
583
64
                if (!attach_rowset_meta->tablet_schema()) {
584
0
                    attach_rowset_meta->set_tablet_schema(tablet->row_binlog_tablet_schema());
585
0
                }
586
587
64
                RowsetSharedPtr attach_rowset;
588
64
                Status attach_create_status =
589
64
                        tablet->create_rowset(attach_rowset_meta, &attach_rowset);
590
64
                if (!attach_create_status.ok()) {
591
0
                    LOG(WARNING) << "could not create rowset from binlog<row> rowset meta: "
592
0
                                 << " rowset_id: " << attach_rowset_meta->rowset_id()
593
0
                                 << " rowset_type: " << attach_rowset_meta->rowset_type()
594
0
                                 << " rowset_state: " << attach_rowset_meta->rowset_state();
595
0
                    invalid_attach_rowset = true;
596
0
                    break;
597
0
                }
598
64
                attach_rowset_map.emplace(attach_rowset_meta->rowset_id(),
599
64
                                          attach_rowset_meta->get_rowset_pb());
600
64
                attach_rowsets.emplace_back(std::move(attach_rowset));
601
64
            }
602
64
        }
603
2.62k
        if (invalid_attach_rowset) {
604
0
            continue;
605
0
        }
606
607
2.62k
        RowsetSharedPtr rowset;
608
2.62k
        Status create_status = tablet->create_rowset(rowset_meta, &rowset);
609
2.62k
        if (!create_status) {
610
0
            LOG(WARNING) << "could not create rowset from rowsetmeta: "
611
0
                         << " rowset_id: " << rowset_meta->rowset_id()
612
0
                         << " rowset_type: " << rowset_meta->rowset_type()
613
0
                         << " rowset_state: " << rowset_meta->rowset_state();
614
0
            continue;
615
0
        }
616
617
2.62k
        std::optional<BinlogFormatPB> binlog_format = std::nullopt;
618
2.62k
        const std::map<RowsetId, RowsetMetaPB>* attach_rowset_map_ptr = nullptr;
619
2.62k
        if (!attach_rowset_map.empty()) {
620
64
            binlog_format = BinlogFormatPB::ROW;
621
64
            attach_rowset_map_ptr = &attach_rowset_map;
622
64
        }
623
624
2.62k
        if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
625
2.62k
            rowset_meta->tablet_uid() == tablet->tablet_uid()) {
626
58
            if (!rowset_meta->tablet_schema()) {
627
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
628
0
                RETURN_IF_ERROR(RowsetMetaManager::save(
629
0
                        _meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
630
0
                        rowset_meta->get_rowset_pb(), binlog_format, attach_rowset_map_ptr));
631
0
            }
632
58
            std::vector<RowsetId> rowset_ids {rowset_meta->rowset_id()};
633
58
            for (const auto& attach_rowset : attach_rowsets) {
634
0
                if (attach_rowset != nullptr) {
635
0
                    rowset_ids.emplace_back(attach_rowset->rowset_id());
636
0
                }
637
0
            }
638
58
            Status commit_txn_status = _engine.txn_manager()->commit_txn(
639
58
                    _meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
640
58
                    rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
641
58
                    rowset, _engine.pending_local_rowsets().add(rowset_ids), true, nullptr,
642
58
                    attach_rowsets.empty() ? nullptr : &attach_rowsets);
643
58
            if (commit_txn_status || commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
644
58
                LOG(INFO) << "successfully to add committed rowset: " << rowset_meta->rowset_id()
645
58
                          << " to tablet: " << rowset_meta->tablet_id()
646
58
                          << " schema hash: " << rowset_meta->tablet_schema_hash()
647
58
                          << " for txn: " << rowset_meta->txn_id() << ", binlog<row> rowset: "
648
58
                          << (attach_rowsets.empty() ? "0"
649
58
                                                     : attach_rowsets[0]->rowset_id().to_string());
650
651
58
            } else if (commit_txn_status.is<ErrorCode::INTERNAL_ERROR>()) {
652
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
653
0
                             << " to tablet: " << rowset_meta->tablet_id()
654
0
                             << " for txn: " << rowset_meta->txn_id()
655
0
                             << " error: " << commit_txn_status << ", binlog<row> rowset: "
656
0
                             << (attach_rowsets.empty()
657
0
                                         ? "0"
658
0
                                         : attach_rowsets[0]->rowset_id().to_string());
659
0
                return commit_txn_status;
660
0
            } else {
661
0
                LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
662
0
                             << " to tablet: " << rowset_meta->tablet_id()
663
0
                             << " for txn: " << rowset_meta->txn_id()
664
0
                             << " error: " << commit_txn_status << ", binlog<row> rowset: "
665
0
                             << (attach_rowsets.empty()
666
0
                                         ? "0"
667
0
                                         : attach_rowsets[0]->rowset_id().to_string());
668
0
            }
669
2.56k
        } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
670
2.56k
                   rowset_meta->tablet_uid() == tablet->tablet_uid()) {
671
2.56k
            if (!rowset_meta->tablet_schema()) {
672
0
                rowset_meta->set_tablet_schema(tablet->tablet_schema());
673
0
                RETURN_IF_ERROR(RowsetMetaManager::save(
674
0
                        _meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
675
0
                        rowset_meta->get_rowset_pb(), binlog_format, attach_rowset_map_ptr));
676
0
            }
677
2.56k
            DCHECK_LE(attach_rowsets.size(), 1);
678
2.56k
            Status publish_status = tablet->add_rowset(
679
2.56k
                    rowset, attach_rowsets.empty() ? nullptr : attach_rowsets[0]);
680
2.56k
            if (!publish_status && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
681
0
                LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
682
0
                             << rowset->rowset_id() << " tablet id: " << rowset_meta->tablet_id()
683
0
                             << " txn id:" << rowset_meta->txn_id()
684
0
                             << " start_version: " << rowset_meta->version().first
685
0
                             << " end_version: " << rowset_meta->version().second
686
0
                             << ", binlog<row> rowset: "
687
0
                             << (attach_rowsets.empty()
688
0
                                         ? "0"
689
0
                                         : attach_rowsets[0]->rowset_id().to_string());
690
0
            }
691
2.56k
        } else {
692
0
            LOG(WARNING) << "find invalid rowset: " << rowset_meta->rowset_id()
693
0
                         << " with tablet id: " << rowset_meta->tablet_id()
694
0
                         << " tablet uid: " << rowset_meta->tablet_uid()
695
0
                         << " schema hash: " << rowset_meta->tablet_schema_hash()
696
0
                         << " txn: " << rowset_meta->txn_id()
697
0
                         << " current valid tablet uid: " << tablet->tablet_uid()
698
0
                         << ", binlog<row> rowset: "
699
0
                         << (attach_rowsets.empty() ? "0"
700
0
                                                    : attach_rowsets[0]->rowset_id().to_string());
701
0
            ++invalid_rowset_counter;
702
0
        }
703
2.62k
    }
704
705
60
    int64_t dbm_cnt {0};
706
60
    int64_t unknown_dbm_cnt {0};
707
60
    auto load_delete_bitmap_func = [this, &dbm_cnt, &unknown_dbm_cnt](int64_t tablet_id,
708
60
                                                                      int64_t version,
709
90
                                                                      std::string_view val) {
710
90
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
711
90
        if (!tablet) {
712
0
            return true;
713
0
        }
714
90
        const auto& all_data_rowsets = tablet->tablet_meta()->all_rs_metas();
715
90
        const auto& all_row_binlogs = tablet->tablet_meta()->all_row_binlog_rs_metas();
716
90
        RowsetIdUnorderedSet rowset_ids;
717
370
        for (const auto& [_, rowset_meta] : all_data_rowsets) {
718
370
            rowset_ids.insert(rowset_meta->rowset_id());
719
370
        }
720
136
        for (auto& [_, rowset_meta] : all_row_binlogs) {
721
136
            rowset_ids.insert(rowset_meta->rowset_id());
722
136
        }
723
724
90
        DeleteBitmapPB delete_bitmap_pb;
725
90
        delete_bitmap_pb.ParseFromArray(val.data(), cast_set<int>(val.size()));
726
90
        int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
727
90
        int seg_ids_size = delete_bitmap_pb.segment_ids_size();
728
90
        int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
729
90
        int binlog_mark_size = delete_bitmap_pb.is_binlog_delvec_size();
730
90
        CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);
731
90
        CHECK(binlog_mark_size == 0 || binlog_mark_size == rst_ids_size);
732
733
342
        for (int i = 0; i < rst_ids_size; ++i) {
734
252
            RowsetId rst_id;
735
252
            rst_id.init(delete_bitmap_pb.rowset_ids(i));
736
            // only process the rowset in _rs_metas and _row_binlog_rs_metas
737
252
            if (rowset_ids.find(rst_id) == rowset_ids.end()) {
738
0
                ++unknown_dbm_cnt;
739
0
                continue;
740
0
            }
741
252
            ++dbm_cnt;
742
252
            auto seg_id = delete_bitmap_pb.segment_ids(i);
743
252
            auto iter = tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
744
252
                    {rst_id, seg_id, version});
745
            // This version of delete bitmap already exists
746
252
            if (iter != tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
747
4
                continue;
748
4
            }
749
248
            auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
750
751
248
            bool from_binlog = delete_bitmap_pb.is_binlog_delvec_size() > 0
752
248
                                       ? delete_bitmap_pb.is_binlog_delvec(i)
753
248
                                       : false;
754
248
            if (!from_binlog) {
755
236
                tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
756
236
                        roaring::Roaring::read(bitmap);
757
236
            } else {
758
12
                tablet->tablet_meta()->binlog_delvec().delete_bitmap[{rst_id, seg_id, version}] =
759
12
                        roaring::Roaring::read(bitmap);
760
12
            }
761
248
            VLOG_ROW << "successfully to add delete_bitmap, tablet_id=" << tablet->tablet_id()
762
0
                     << ", rowset_id=" << rst_id << ", seg_id=" << seg_id << ", version=" << version
763
0
                     << ", from_binlog=" << from_binlog;
764
248
        }
765
90
        return true;
766
90
    };
767
60
    MonotonicStopWatch dbm_timer;
768
60
    dbm_timer.start();
769
60
    RETURN_IF_ERROR(TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func));
770
60
    dbm_timer.stop();
771
772
60
    LOG(INFO) << "load delete bitmap from meta finished, cost: "
773
60
              << dbm_timer.elapsed_time_milliseconds() << " ms, data dir: " << _path;
774
775
    // At startup, we only count these invalid rowset, but do not actually delete it.
776
    // The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
777
    // which is cleaned up uniformly by the background cleanup thread.
778
60
    LOG(INFO) << "finish to load tablets from " << _path
779
60
              << ", total rowset meta: " << dir_rowset_metas.size()
780
60
              << ", invalid rowset num: " << invalid_rowset_counter
781
60
              << ", visible/stale rowsets' delete bitmap count: " << dbm_cnt
782
60
              << ", invalid rowsets' delete bitmap count: " << unknown_dbm_cnt;
783
784
60
    return Status::OK();
785
60
}
786
787
// gc unused local tablet dir
788
281k
void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) {
789
281k
    if (_stop_bg_worker) {
790
0
        return;
791
0
    }
792
793
281k
    TTabletId tablet_id = -1;
794
281k
    TSchemaHash schema_hash = -1;
795
281k
    bool is_valid = TabletManager::get_tablet_id_and_schema_hash_from_path(
796
281k
            tablet_schema_hash_path, &tablet_id, &schema_hash);
797
281k
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
798
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
799
0
        return;
800
0
    }
801
802
281k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
803
281k
    if (!tablet || tablet->data_dir() != this) {
804
284
        if (tablet) {
805
0
            LOG(INFO) << "The tablet in path " << tablet_schema_hash_path
806
0
                      << " is not same with the running one: " << tablet->tablet_path()
807
0
                      << ", might be the old tablet after migration, try to move it to trash";
808
0
        }
809
284
        _engine.tablet_manager()->try_delete_unused_tablet_path(this, tablet_id, schema_hash,
810
284
                                                                tablet_schema_hash_path, shard_id);
811
284
        return;
812
284
    }
813
814
281k
    _perform_rowset_gc(tablet_schema_hash_path);
815
281k
}
816
817
// gc unused local rowsets under tablet dir
818
281k
void DataDir::_perform_rowset_gc(const std::string& tablet_schema_hash_path) {
819
281k
    if (_stop_bg_worker) {
820
0
        return;
821
0
    }
822
823
281k
    TTabletId tablet_id = -1;
824
281k
    TSchemaHash schema_hash = -1;
825
281k
    bool is_valid = doris::TabletManager::get_tablet_id_and_schema_hash_from_path(
826
281k
            tablet_schema_hash_path, &tablet_id, &schema_hash);
827
281k
    if (!is_valid || tablet_id < 1 || schema_hash < 1) [[unlikely]] {
828
0
        LOG(WARNING) << "[path gc] unknown path: " << tablet_schema_hash_path;
829
0
        return;
830
0
    }
831
832
281k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
833
281k
    if (!tablet) {
834
        // Could not found the tablet, maybe it's a dropped tablet, will be reclaimed
835
        // in the next time `_perform_path_gc_by_tablet`
836
0
        return;
837
0
    }
838
839
281k
    if (tablet->data_dir() != this) {
840
        // Current running tablet is not in same data_dir, maybe it's a tablet after migration,
841
        // will be reclaimed in the next time `_perform_path_gc_by_tablet`
842
0
        return;
843
0
    }
844
845
281k
    bool exists;
846
281k
    std::vector<io::FileInfo> files;
847
281k
    auto st = io::global_local_filesystem()->list(tablet_schema_hash_path, true, &files, &exists);
848
281k
    if (!st.ok()) [[unlikely]] {
849
0
        LOG(WARNING) << "[path gc] fail to list tablet path " << tablet_schema_hash_path << " : "
850
0
                     << st;
851
0
        return;
852
0
    }
853
854
    // Rowset files excluding pending rowsets
855
281k
    std::vector<std::pair<RowsetId, std::string /* filename */>> rowsets_not_pending;
856
281k
    for (auto&& file : files) {
857
75.9k
        auto rowset_id = extract_rowset_id(file.file_name);
858
75.9k
        if (rowset_id.hi == 0) {
859
124
            continue; // Not a rowset
860
124
        }
861
862
75.7k
        if (_engine.pending_local_rowsets().contains(rowset_id)) {
863
110
            continue; // Pending rowset file
864
110
        }
865
866
75.6k
        rowsets_not_pending.emplace_back(rowset_id, std::move(file.file_name));
867
75.6k
    }
868
869
281k
    RowsetIdUnorderedSet rowsets_in_version_map;
870
281k
    tablet->traverse_rowsets(
871
485k
            [&rowsets_in_version_map](auto& rs) { rowsets_in_version_map.insert(rs->rowset_id()); },
872
281k
            true);
873
281k
    for (const auto& [_, rb_meta] : tablet->tablet_meta()->all_row_binlog_rs_metas()) {
874
92
        rowsets_in_version_map.insert(rb_meta->rowset_id());
875
92
    }
876
877
281k
    DBUG_EXECUTE_IF("DataDir::_perform_rowset_gc.simulation.slow", {
878
281k
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
879
281k
        if (target_tablet_id == tablet_id) {
880
281k
            LOG(INFO) << "debug point wait tablet to remove rsmgr tabletId=" << tablet_id;
881
281k
            DBUG_BLOCK;
882
281k
        }
883
281k
    });
884
885
281k
    auto reclaim_rowset_file = [](const std::string& path) {
886
426
        auto st = io::global_local_filesystem()->delete_file(path);
887
426
        if (!st.ok()) [[unlikely]] {
888
0
            LOG(WARNING) << "[path gc] failed to delete garbage rowset file: " << st;
889
0
            return;
890
0
        }
891
426
        LOG(INFO) << "[path gc] delete garbage path: " << path; // Audit log
892
426
    };
893
894
281k
    auto should_reclaim = [&, this](const RowsetId& rowset_id) {
895
66.6k
        return !rowsets_in_version_map.contains(rowset_id) &&
896
66.6k
               !_engine.check_rowset_id_in_unused_rowsets(rowset_id) &&
897
66.6k
               RowsetMetaManager::exists(get_meta(), tablet->tablet_uid(), rowset_id)
898
604
                       .is<META_KEY_NOT_FOUND>() &&
899
66.6k
               !RowsetMetaManager::row_binlog_meta_exists(get_meta(), tablet->tablet_uid(),
900
424
                                                          rowset_id);
901
66.6k
    };
902
903
    // rowset_id -> is_garbage
904
281k
    std::unordered_map<RowsetId, bool> checked_rowsets;
905
281k
    for (auto&& [rowset_id, filename] : rowsets_not_pending) {
906
75.6k
        if (_stop_bg_worker) {
907
0
            return;
908
0
        }
909
910
75.6k
        if (auto it = checked_rowsets.find(rowset_id); it != checked_rowsets.end()) {
911
8.97k
            if (it->second) { // Is checked garbage rowset
912
62
                reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
913
62
            }
914
8.97k
            continue;
915
8.97k
        }
916
917
66.7k
        if (should_reclaim(rowset_id)) {
918
444
            if (config::path_gc_check_step > 0 &&
919
444
                ++_path_gc_step % config::path_gc_check_step == 0) {
920
0
                std::this_thread::sleep_for(
921
0
                        std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
922
0
            }
923
444
            reclaim_rowset_file(tablet_schema_hash_path + '/' + filename);
924
444
            checked_rowsets.emplace(rowset_id, true);
925
66.2k
        } else {
926
66.2k
            checked_rowsets.emplace(rowset_id, false);
927
66.2k
        }
928
66.7k
    }
929
281k
}
930
931
13
void DataDir::perform_path_gc() {
932
13
    if (_stop_bg_worker) {
933
0
        return;
934
0
    }
935
936
13
    LOG(INFO) << "start to gc data dir " << _path;
937
13
    auto data_path = fmt::format("{}/{}", _path, DATA_PREFIX);
938
13
    std::vector<io::FileInfo> shards;
939
13
    bool exists = true;
940
13
    const auto& fs = io::global_local_filesystem();
941
13
    auto st = fs->list(data_path, false, &shards, &exists);
942
13
    if (!st.ok()) [[unlikely]] {
943
0
        LOG(WARNING) << "failed to scan data dir: " << st;
944
0
        return;
945
0
    }
946
947
12.2k
    for (const auto& shard : shards) {
948
12.2k
        if (_stop_bg_worker) {
949
0
            break;
950
0
        }
951
952
12.2k
        if (shard.is_file) {
953
0
            continue;
954
0
        }
955
956
12.2k
        auto shard_path = fmt::format("{}/{}", data_path, shard.file_name);
957
12.2k
        std::vector<io::FileInfo> tablet_ids;
958
12.2k
        st = io::global_local_filesystem()->list(shard_path, false, &tablet_ids, &exists);
959
12.2k
        if (!st.ok()) [[unlikely]] {
960
0
            LOG(WARNING) << "fail to walk dir, shard_path=" << shard_path << " : " << st;
961
0
            continue;
962
0
        }
963
964
281k
        for (const auto& tablet_id : tablet_ids) {
965
281k
            if (_stop_bg_worker) {
966
0
                break;
967
0
            }
968
969
281k
            if (tablet_id.is_file) {
970
0
                continue;
971
0
            }
972
973
281k
            auto tablet_id_path = fmt::format("{}/{}", shard_path, tablet_id.file_name);
974
281k
            std::vector<io::FileInfo> schema_hashes;
975
281k
            st = fs->list(tablet_id_path, false, &schema_hashes, &exists);
976
281k
            if (!st.ok()) [[unlikely]] {
977
0
                LOG(WARNING) << "fail to walk dir, tablet_id_path=" << tablet_id_path << " : "
978
0
                             << st;
979
0
                continue;
980
0
            }
981
982
281k
            for (auto&& schema_hash : schema_hashes) {
983
281k
                if (schema_hash.is_file) {
984
0
                    continue;
985
0
                }
986
987
281k
                if (config::path_gc_check_step > 0 &&
988
281k
                    ++_path_gc_step % config::path_gc_check_step == 0) {
989
278
                    std::this_thread::sleep_for(
990
278
                            std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
991
278
                }
992
281k
                int16_t shard_id = -1;
993
281k
                try {
994
281k
                    shard_id = cast_set<int16_t>(std::stoi(shard.file_name));
995
281k
                } catch (const std::exception&) {
996
0
                    LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name;
997
0
                    continue;
998
0
                }
999
281k
                _perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id);
1000
281k
            }
1001
281k
        }
1002
12.2k
    }
1003
1004
13
    LOG(INFO) << "gc data dir path: " << _path << " finished";
1005
13
}
1006
1007
588
Status DataDir::update_capacity() {
1008
588
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
1009
588
                                                                  &_available_bytes));
1010
577
    disks_total_capacity->set_value(_disk_capacity_bytes);
1011
577
    disks_avail_capacity->set_value(_available_bytes);
1012
577
    LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
1013
577
              << ", available capacity: " << _available_bytes << ", usage: " << get_usage(0)
1014
577
              << ", in_use: " << is_used();
1015
1016
577
    return Status::OK();
1017
588
}
1018
1019
54
void DataDir::update_trash_capacity() {
1020
54
    auto trash_path = fmt::format("{}/{}", _path, TRASH_PREFIX);
1021
54
    try {
1022
54
        _trash_used_bytes = _engine.get_file_or_directory_size(trash_path);
1023
54
    } catch (const std::filesystem::filesystem_error& e) {
1024
0
        LOG(WARNING) << "update trash capacity failed, path: " << _path << ", err: " << e.what();
1025
0
        return;
1026
0
    }
1027
54
    disks_trash_used_capacity->set_value(_trash_used_bytes);
1028
54
    LOG(INFO) << "path: " << _path << " trash capacity: " << _trash_used_bytes;
1029
54
}
1030
1031
355
void DataDir::update_local_data_size(int64_t size) {
1032
355
    disks_local_used_capacity->set_value(size);
1033
355
}
1034
1035
355
void DataDir::update_remote_data_size(int64_t size) {
1036
355
    disks_remote_used_capacity->set_value(size);
1037
355
}
1038
1039
0
size_t DataDir::tablet_size() const {
1040
0
    std::lock_guard<std::mutex> l(_mutex);
1041
0
    return _tablet_set.size();
1042
0
}
1043
1044
28.9k
bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
1045
28.9k
    double used_pct = get_usage(incoming_data_size);
1046
28.9k
    int64_t left_bytes = _available_bytes - incoming_data_size;
1047
28.9k
    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
1048
28.9k
        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
1049
0
        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
1050
0
                     << ", left bytes: " << left_bytes << ", path: " << _path;
1051
0
        return true;
1052
0
    }
1053
28.9k
    return false;
1054
28.9k
}
1055
1056
2.87k
void DataDir::disks_compaction_score_increment(int64_t delta) {
1057
2.87k
    disks_compaction_score->increment(delta);
1058
2.87k
}
1059
1060
2.87k
void DataDir::disks_compaction_num_increment(int64_t delta) {
1061
2.87k
    disks_compaction_num->increment(delta);
1062
2.87k
}
1063
1064
4.69k
Status DataDir::move_to_trash(const std::string& tablet_path) {
1065
4.69k
    if (config::trash_file_expire_time_sec <= 0) {
1066
4.69k
        LOG(INFO) << "delete tablet dir " << tablet_path
1067
4.69k
                  << " directly due to trash_file_expire_time_sec is 0";
1068
4.69k
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path));
1069
4.69k
        return delete_tablet_parent_path_if_empty(tablet_path);
1070
4.69k
    }
1071
1072
0
    Status res = Status::OK();
1073
    // 1. get timestamp string
1074
0
    std::string time_str;
1075
0
    if ((res = gen_timestamp_string(&time_str)) != Status::OK()) {
1076
0
        LOG(WARNING) << "failed to generate time_string when move file to trash.err code=" << res;
1077
0
        return res;
1078
0
    }
1079
1080
    // 2. generate new file path
1081
    // a global counter to avoid file name duplication.
1082
0
    static std::atomic<uint64_t> delete_counter(0);
1083
0
    auto trash_root_path =
1084
0
            fmt::format("{}/{}/{}.{}", _path, TRASH_PREFIX, time_str, delete_counter++);
1085
0
    auto fs_tablet_path = io::Path(tablet_path);
1086
0
    auto trash_tablet_path = trash_root_path /
1087
0
                             fs_tablet_path.parent_path().filename() /* tablet_id */ /
1088
0
                             fs_tablet_path.filename() /* schema_hash */;
1089
1090
    // 3. create target dir, or the rename() function will fail.
1091
0
    auto trash_tablet_parent = trash_tablet_path.parent_path();
1092
    // create dir if not exists
1093
0
    bool exists = true;
1094
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(trash_tablet_parent, &exists));
1095
0
    if (!exists) {
1096
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(trash_tablet_parent));
1097
0
    }
1098
1099
    // 4. move tablet to trash
1100
0
    VLOG_NOTICE << "move file to trash. " << tablet_path << " -> " << trash_tablet_path;
1101
0
    if (rename(tablet_path.c_str(), trash_tablet_path.c_str()) < 0) {
1102
0
        return Status::Error<OS_ERROR>("move file to trash failed. file={}, target={}, err={}",
1103
0
                                       tablet_path, trash_tablet_path.native(), Errno::str());
1104
0
    }
1105
1106
    // 5. check parent dir of source file, delete it when empty
1107
0
    RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path));
1108
1109
0
    return Status::OK();
1110
0
}
1111
1112
4.69k
Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) {
1113
4.69k
    auto fs_tablet_path = io::Path(tablet_path);
1114
4.69k
    std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level
1115
4.69k
    std::vector<io::FileInfo> sub_files;
1116
4.69k
    bool exists = true;
1117
4.69k
    RETURN_IF_ERROR(
1118
4.69k
            io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists));
1119
4.69k
    if (sub_files.empty()) {
1120
4.69k
        LOG(INFO) << "remove empty dir " << source_parent_dir;
1121
        // no need to exam return status
1122
4.69k
        RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(source_parent_dir));
1123
4.69k
    }
1124
4.69k
    return Status::OK();
1125
4.69k
}
1126
1127
54
void DataDir::perform_remote_rowset_gc() {
1128
54
    std::vector<std::pair<std::string, std::string>> gc_kvs;
1129
54
    auto traverse_remote_rowset_func = [&gc_kvs](std::string_view key,
1130
54
                                                 std::string_view value) -> bool {
1131
0
        gc_kvs.emplace_back(key, value);
1132
0
        return true;
1133
0
    };
1134
54
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX,
1135
54
                                     traverse_remote_rowset_func));
1136
54
    std::vector<std::string> deleted_keys;
1137
54
    for (auto& [key, val] : gc_kvs) {
1138
0
        auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
1139
0
        RemoteRowsetGcPB gc_pb;
1140
0
        if (!gc_pb.ParseFromString(val)) {
1141
0
            LOG(WARNING) << "malformed RemoteRowsetGcPB. rowset_id=" << rowset_id;
1142
0
            deleted_keys.push_back(std::move(key));
1143
0
            continue;
1144
0
        }
1145
1146
0
        auto storage_resource = get_storage_resource(gc_pb.resource_id());
1147
0
        if (!storage_resource) {
1148
0
            LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
1149
0
            continue;
1150
0
        }
1151
1152
0
        std::vector<io::Path> seg_paths;
1153
0
        seg_paths.reserve(gc_pb.num_segments());
1154
0
        for (int i = 0; i < gc_pb.num_segments(); ++i) {
1155
0
            seg_paths.emplace_back(
1156
0
                    storage_resource->first.remote_segment_path(gc_pb.tablet_id(), rowset_id, i));
1157
0
        }
1158
1159
0
        auto& fs = storage_resource->first.fs;
1160
0
        LOG(INFO) << "delete remote rowset. root_path=" << fs->root_path()
1161
0
                  << ", rowset_id=" << rowset_id;
1162
0
        auto st = fs->batch_delete(seg_paths);
1163
0
        if (st.ok()) {
1164
0
            deleted_keys.push_back(std::move(key));
1165
0
            unused_remote_rowset_num << -1;
1166
0
        } else {
1167
0
            LOG(WARNING) << "failed to delete remote rowset. err=" << st;
1168
0
        }
1169
0
    }
1170
54
    for (const auto& key : deleted_keys) {
1171
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1172
0
    }
1173
54
}
1174
1175
54
void DataDir::perform_remote_tablet_gc() {
1176
54
    std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
1177
54
    auto traverse_remote_tablet_func = [&tablet_gc_kvs](std::string_view key,
1178
54
                                                        std::string_view value) -> bool {
1179
0
        tablet_gc_kvs.emplace_back(key, value);
1180
0
        return true;
1181
0
    };
1182
54
    static_cast<void>(_meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX,
1183
54
                                     traverse_remote_tablet_func));
1184
54
    std::vector<std::string> deleted_keys;
1185
54
    for (auto& [key, val] : tablet_gc_kvs) {
1186
0
        auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
1187
0
        RemoteTabletGcPB gc_pb;
1188
0
        if (!gc_pb.ParseFromString(val)) {
1189
0
            LOG(WARNING) << "malformed RemoteTabletGcPB. tablet_id=" << tablet_id;
1190
0
            deleted_keys.push_back(std::move(key));
1191
0
            continue;
1192
0
        }
1193
0
        bool success = true;
1194
0
        for (auto& resource_id : gc_pb.resource_ids()) {
1195
0
            auto fs = get_filesystem(resource_id);
1196
0
            if (!fs) {
1197
0
                LOG(WARNING) << "could not get file system. resource_id=" << resource_id;
1198
0
                success = false;
1199
0
                continue;
1200
0
            }
1201
0
            LOG(INFO) << "delete remote rowsets of tablet. root_path=" << fs->root_path()
1202
0
                      << ", tablet_id=" << tablet_id;
1203
0
            auto st = fs->delete_directory(DATA_PREFIX + '/' + tablet_id);
1204
0
            if (!st.ok()) {
1205
0
                LOG(WARNING) << "failed to delete all remote rowset in tablet. err=" << st;
1206
0
                success = false;
1207
0
            }
1208
0
        }
1209
0
        if (success) {
1210
0
            deleted_keys.push_back(std::move(key));
1211
0
        }
1212
0
    }
1213
54
    for (const auto& key : deleted_keys) {
1214
0
        static_cast<void>(_meta->remove(META_COLUMN_FAMILY_INDEX, key));
1215
0
    }
1216
54
}
1217
} // namespace doris