Coverage Report

Created: 2026-03-15 22:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/wal/wal_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/group_commit/wal/wal_manager.h"
19
20
#include <absl/strings/str_split.h>
21
#include <bvar/bvar.h>
22
#include <glog/logging.h>
23
24
#include <chrono>
25
#include <filesystem>
26
#include <shared_mutex>
27
#include <thread>
28
#include <unordered_map>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "io/fs/local_file_system.h"
34
#include "load/group_commit/wal/wal_dirs_info.h"
35
#include "load/group_commit/wal/wal_reader.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/fragment_mgr.h"
38
#include "util/parse_util.h"
39
40
namespace doris {
41
42
bvar::Status<size_t> g_wal_total_count("wal_total_count", 0);
43
bvar::Status<size_t> g_wal_max_count_per_table("wal_max_count_per_table", 0);
44
45
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
46
2
        : _exec_env(exec_env),
47
2
          _stop(false),
48
2
          _stop_background_threads_latch(1),
49
2
          _first_replay(true) {
50
2
    _wal_dirs = absl::StrSplit(wal_dir_list, ";", absl::SkipWhitespace());
51
2
    static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
52
2
                              .set_min_threads(1)
53
2
                              .set_max_threads(config::group_commit_relay_wal_threads)
54
2
                              .build(&_thread_pool));
55
2
    _wal_dirs_info = WalDirsInfo::create_unique();
56
2
}
57
58
2
WalManager::~WalManager() {
59
2
    LOG(INFO) << "WalManager is destoried";
60
2
}
61
62
0
bool WalManager::is_running() {
63
0
    return !_stop.load();
64
0
}
65
66
1
void WalManager::stop() {
67
1
    if (!this->_stop.load()) {
68
1
        this->_stop.store(true);
69
1
        _stop_relay_wal();
70
1
        _stop_background_threads_latch.count_down();
71
1
        if (_replay_thread) {
72
0
            _replay_thread->join();
73
0
        }
74
1
        if (_update_wal_dirs_info_thread) {
75
0
            _update_wal_dirs_info_thread->join();
76
0
        }
77
1
        _thread_pool->shutdown();
78
1
        LOG(INFO) << "WalManager is stopped";
79
1
    }
80
1
}
81
82
0
Status WalManager::init() {
83
0
    RETURN_IF_ERROR(_init_wal_dirs_conf());
84
0
    RETURN_IF_ERROR(_init_wal_dirs());
85
0
    RETURN_IF_ERROR(_init_wal_dirs_info());
86
0
    return Thread::create(
87
0
            "WalMgr", "replay_wal", [this]() { static_cast<void>(this->_replay_background()); },
88
0
            &_replay_thread);
89
0
}
90
91
0
Status WalManager::_init_wal_dirs_conf() {
92
0
    std::vector<std::string> tmp_dirs;
93
0
    if (_wal_dirs.empty()) {
94
        // default case.
95
0
        for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) {
96
0
            tmp_dirs.emplace_back(path.path + "/wal");
97
0
        }
98
0
    } else {
99
        // user config must be absolute path.
100
0
        for (const std::string& wal_dir : _wal_dirs) {
101
0
            if (std::filesystem::path(wal_dir).is_absolute()) {
102
0
                tmp_dirs.emplace_back(wal_dir);
103
0
            } else {
104
0
                return Status::InternalError(
105
0
                        "BE config group_commit_replay_wal_dir has to be absolute path!");
106
0
            }
107
0
        }
108
0
    }
109
0
    _wal_dirs = tmp_dirs;
110
0
    return Status::OK();
111
0
}
112
113
0
Status WalManager::_init_wal_dirs() {
114
0
    bool exists = false;
115
0
    for (auto wal_dir : _wal_dirs) {
116
0
        std::string tmp_dir = wal_dir + "/" + _tmp;
117
0
        LOG(INFO) << "wal_dir:" << wal_dir << ", tmp_dir:" << tmp_dir;
118
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
119
0
        if (!exists) {
120
0
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir));
121
0
        }
122
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists));
123
0
        if (!exists) {
124
0
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
125
0
        }
126
0
    }
127
0
    return Status::OK();
128
0
}
129
130
15
Status WalManager::_init_wal_dirs_info() {
131
15
    for (const std::string& wal_dir : _wal_dirs) {
132
15
        size_t available_bytes;
133
#ifndef BE_TEST
134
        size_t disk_capacity_bytes;
135
        RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir, &disk_capacity_bytes,
136
                                                                      &available_bytes));
137
#else
138
15
        available_bytes = wal_limit_test_bytes;
139
15
#endif
140
15
        bool is_percent = true;
141
15
        int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit,
142
15
                                                           -1, available_bytes, &is_percent);
143
15
        if (wal_disk_limit < 0) {
144
3
            return Status::InternalError(
145
3
                    "group_commit_wal_max_disk_limit config is wrong, please check your config!");
146
3
        }
147
        // if there are some wal files in wal dir, we need to add it to wal disk limit.
148
12
        size_t wal_dir_size = 0;
149
#ifndef BE_TEST
150
        RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size));
151
#endif
152
12
        if (is_percent) {
153
4
            wal_disk_limit += wal_dir_size;
154
4
        }
155
12
        RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, wal_dir_size, 0));
156
157
12
#ifdef BE_TEST
158
12
        wal_limit_test_bytes = wal_disk_limit;
159
12
#endif
160
12
    }
161
#ifndef BE_TEST
162
    return Thread::create(
163
            "WalMgr", "update_wal_dir_info",
164
            [this]() { static_cast<void>(this->_update_wal_dir_info_thread()); },
165
            &_update_wal_dirs_info_thread);
166
#else
167
12
    return Status::OK();
168
15
#endif
169
15
}
170
171
0
void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) {
172
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
173
0
    LOG(INFO) << "add wal to queue, table_id: " << table_id << ", wal_id: " << wal_id;
174
0
    auto it = _wal_queues.find(table_id);
175
0
    if (it == _wal_queues.end()) {
176
0
        std::set<int64_t> tmp_set;
177
0
        tmp_set.insert(wal_id);
178
0
        _wal_queues.emplace(table_id, tmp_set);
179
0
    } else {
180
0
        it->second.insert(wal_id);
181
0
    }
182
0
}
183
184
0
void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) {
185
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
186
0
    auto it = _wal_queues.find(table_id);
187
0
    if (it != _wal_queues.end()) {
188
0
        LOG(INFO) << "remove wal from queue, table_id: " << table_id << ", wal_id: " << wal_id;
189
0
        it->second.erase(wal_id);
190
0
        if (it->second.empty()) {
191
0
            _wal_queues.erase(table_id);
192
0
        }
193
0
    }
194
0
}
195
196
0
size_t WalManager::get_wal_queue_size(int64_t table_id) {
197
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
198
0
    size_t count = 0;
199
0
    if (table_id > 0) {
200
0
        auto it = _wal_queues.find(table_id);
201
0
        if (it != _wal_queues.end()) {
202
0
            return it->second.size();
203
0
        } else {
204
0
            return 0;
205
0
        }
206
0
    } else {
207
        // table_id is -1 meaning get all table wal size
208
0
        size_t max_count_per_table = 0;
209
0
        for (auto& [_, table_wals] : _wal_queues) {
210
0
            size_t table_wal_count = table_wals.size();
211
0
            count += table_wal_count;
212
0
            if (table_wal_count > max_count_per_table) {
213
0
                max_count_per_table = table_wal_count;
214
0
            }
215
0
        }
216
0
        g_wal_max_count_per_table.set_value(max_count_per_table);
217
0
    }
218
0
    return count;
219
0
}
220
221
Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
222
                                   const std::string& label, std::string& base_path,
223
2
                                   uint32_t wal_version) {
224
2
    base_path = _wal_dirs_info->get_available_random_wal_dir();
225
2
    std::stringstream ss;
226
2
    ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
227
2
       << std::to_string(wal_version) << "_" << _exec_env->cluster_info()->backend_id << "_"
228
2
       << std::to_string(wal_id) << "_" << label;
229
2
    {
230
2
        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
231
2
        auto it = _wal_path_map.find(wal_id);
232
2
        if (it != _wal_path_map.end()) {
233
0
            return Status::InternalError("wal_id {} already in wal_path_map", wal_id);
234
0
        }
235
2
        _wal_path_map.emplace(wal_id, ss.str());
236
2
    }
237
0
    return Status::OK();
238
2
}
239
240
0
Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
241
0
    std::shared_lock rdlock(_wal_path_lock);
242
0
    auto it = _wal_path_map.find(wal_id);
243
0
    if (it != _wal_path_map.end()) {
244
0
        wal_path = _wal_path_map[wal_id];
245
0
    } else {
246
0
        return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id);
247
0
    }
248
0
    return Status::OK();
249
0
}
250
251
Status WalManager::parse_wal_path(const std::string& file_name, int64_t& version,
252
0
                                  int64_t& backend_id, int64_t& wal_id, std::string& label) {
253
0
    try {
254
        // find version
255
0
        auto pos = file_name.find("_");
256
0
        version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
257
        // find be id
258
0
        auto substring1 = file_name.substr(pos + 1);
259
0
        pos = substring1.find("_");
260
0
        backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
261
        // find wal id
262
0
        auto substring2 = substring1.substr(pos + 1);
263
0
        pos = substring2.find("_");
264
0
        wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
265
        // find label
266
0
        label = substring2.substr(pos + 1);
267
0
        VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << ",wal_id:" << wal_id
268
0
                   << ",label:" << label;
269
0
    } catch (const std::invalid_argument& e) {
270
0
        return Status::InvalidArgument("Invalid format, {}", e.what());
271
0
    }
272
0
    return Status::OK();
273
0
}
274
275
0
Status WalManager::_load_wals() {
276
0
    std::vector<ScanWalInfo> wals;
277
0
    for (auto wal_dir : _wal_dirs) {
278
0
        WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal dir={}", wal_dir));
279
0
    }
280
0
    for (const auto& wal : wals) {
281
0
        bool exists = false;
282
0
        WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, &exists),
283
0
                      fmt::format("fail to check exist on wal file={}", wal.wal_path));
284
0
        if (!exists) {
285
0
            continue;
286
0
        }
287
0
        LOG(INFO) << "find wal: " << wal.wal_path;
288
0
        {
289
0
            std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
290
0
            auto it = _wal_path_map.find(wal.wal_id);
291
0
            if (it != _wal_path_map.end()) {
292
0
                LOG(INFO) << "wal_id " << wal.wal_id << " already in wal_path_map, skip it";
293
0
                continue;
294
0
            }
295
0
            _wal_path_map.emplace(wal.wal_id, wal.wal_path);
296
0
        }
297
        // this config is use for test p0 case in pipeline
298
0
        if (config::group_commit_wait_replay_wal_finish) {
299
0
            auto lock = std::make_shared<std::mutex>();
300
0
            auto cv = std::make_shared<std::condition_variable>();
301
0
            auto add_st = add_wal_cv_map(wal.wal_id, lock, cv);
302
0
            if (!add_st.ok()) {
303
0
                LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to wal_cv_map";
304
0
                continue;
305
0
            }
306
0
        }
307
0
        _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id);
308
0
        WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path),
309
0
                      fmt::format("Failed to add recover wal={}", wal.wal_path));
310
0
    }
311
0
    return Status::OK();
312
0
}
313
314
0
Status WalManager::_scan_wals(const std::string& wal_path, std::vector<ScanWalInfo>& res) {
315
0
    bool exists = false;
316
0
    auto last_total_size = res.size();
317
0
    std::vector<io::FileInfo> dbs;
318
0
    Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists);
319
0
    if (!st.ok()) {
320
0
        LOG(WARNING) << "failed list files for wal_dir=" << wal_path << ", st=" << st.to_string();
321
0
        return st;
322
0
    }
323
0
    for (const auto& database_id : dbs) {
324
0
        if (database_id.is_file || database_id.file_name == _tmp) {
325
0
            continue;
326
0
        }
327
0
        std::vector<io::FileInfo> tables;
328
0
        auto db_path = wal_path + "/" + database_id.file_name;
329
0
        st = io::global_local_filesystem()->list(db_path, false, &tables, &exists);
330
0
        if (!st.ok()) {
331
0
            LOG(WARNING) << "failed list files for wal_dir=" << db_path
332
0
                         << ", st=" << st.to_string();
333
0
            return st;
334
0
        }
335
0
        for (const auto& table_id : tables) {
336
0
            if (table_id.is_file) {
337
0
                continue;
338
0
            }
339
0
            std::vector<io::FileInfo> wals;
340
0
            auto table_path = db_path + "/" + table_id.file_name;
341
0
            st = io::global_local_filesystem()->list(table_path, false, &wals, &exists);
342
0
            if (!st.ok()) {
343
0
                LOG(WARNING) << "failed list files for wal_dir=" << table_path
344
0
                             << ", st=" << st.to_string();
345
0
                return st;
346
0
            }
347
0
            if (wals.empty()) {
348
0
                continue;
349
0
            }
350
0
            int64_t db_id = -1;
351
0
            int64_t tb_id = -1;
352
0
            try {
353
0
                db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
354
0
                tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
355
0
            } catch (const std::invalid_argument& e) {
356
0
                return Status::InvalidArgument("Invalid format, {}", e.what());
357
0
            }
358
0
            for (const auto& wal : wals) {
359
0
                int64_t version = -1;
360
0
                int64_t backend_id = -1;
361
0
                int64_t wal_id = -1;
362
0
                std::string label = "";
363
0
                auto parse_st = parse_wal_path(wal.file_name, version, backend_id, wal_id, label);
364
0
                if (!parse_st.ok()) {
365
0
                    LOG(WARNING) << "fail to parse file=" << wal.file_name
366
0
                                 << ",st=" << parse_st.to_string();
367
0
                    continue;
368
0
                }
369
0
                auto wal_file = table_path + "/" + wal.file_name;
370
0
                struct ScanWalInfo scan_wal_info;
371
0
                scan_wal_info.wal_path = wal_file;
372
0
                scan_wal_info.db_id = db_id;
373
0
                scan_wal_info.tb_id = tb_id;
374
0
                scan_wal_info.wal_id = wal_id;
375
0
                scan_wal_info.be_id = backend_id;
376
0
                res.emplace_back(scan_wal_info);
377
0
            }
378
0
        }
379
0
    }
380
0
    LOG(INFO) << "Finish list wal_dir=" << wal_path
381
0
              << ", wal count=" << std::to_string(res.size() - last_total_size);
382
0
    return Status::OK();
383
0
}
384
385
0
Status WalManager::_replay_background() {
386
0
    do {
387
0
        if (_stop.load()) {
388
0
            break;
389
0
        }
390
        // port == 0 means not received heartbeat yet
391
0
        if (_exec_env->cluster_info() != nullptr &&
392
0
            _exec_env->cluster_info()->master_fe_addr.port == 0) {
393
0
            continue;
394
0
        }
395
        // replay residual wal,only replay once
396
0
        bool expected = true;
397
0
        if (_first_replay.compare_exchange_strong(expected, false)) {
398
0
            RETURN_IF_ERROR(_load_wals());
399
0
        }
400
0
        g_wal_total_count.set_value(get_wal_queue_size(-1));
401
        // replay wal of current process
402
0
        std::vector<int64_t> replay_tables;
403
0
        {
404
0
            std::lock_guard<std::shared_mutex> wrlock(_table_lock);
405
0
            auto it = _table_map.begin();
406
0
            while (it != _table_map.end()) {
407
0
                if (it->second->size() > 0) {
408
0
                    replay_tables.push_back(it->first);
409
0
                }
410
0
                it++;
411
0
            }
412
0
        }
413
0
        for (const auto& table_id : replay_tables) {
414
0
            RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] {
415
0
                auto st = this->_table_map[table_id]->replay_wals();
416
0
                if (!st.ok()) {
417
0
                    LOG(WARNING) << "failed to submit replay wal for table=" << table_id;
418
0
                }
419
0
            }));
420
0
        }
421
0
    } while (!_stop_background_threads_latch.wait_for(
422
0
            std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds)));
423
0
    return Status::OK();
424
0
}
425
426
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
427
0
                                   std::string wal) {
428
0
    std::lock_guard<std::shared_mutex> wrlock(_table_lock);
429
0
    std::shared_ptr<WalTable> table_ptr;
430
0
    auto it = _table_map.find(table_id);
431
0
    if (it == _table_map.end()) {
432
0
        table_ptr = std::make_shared<WalTable>(_exec_env, db_id, table_id);
433
0
        _table_map.emplace(table_id, table_ptr);
434
0
    } else {
435
0
        table_ptr = it->second;
436
0
    }
437
0
    table_ptr->add_wal(wal_id, wal);
438
#ifndef BE_TEST
439
    WARN_IF_ERROR(update_wal_dir_limit(get_base_wal_path(wal)),
440
                  "Failed to update wal dir limit while add recover wal!");
441
    WARN_IF_ERROR(update_wal_dir_used(get_base_wal_path(wal)),
442
                  "Failed to update wal dir used while add recove wal!");
443
#endif
444
0
    return Status::OK();
445
0
}
446
447
0
size_t WalManager::get_wal_table_size(int64_t table_id) {
448
0
    std::shared_lock rdlock(_table_lock);
449
0
    auto it = _table_map.find(table_id);
450
0
    if (it != _table_map.end()) {
451
0
        return it->second->size();
452
0
    } else {
453
0
        return 0;
454
0
    }
455
0
}
456
457
1
void WalManager::_stop_relay_wal() {
458
1
    std::lock_guard<std::shared_mutex> wrlock(_table_lock);
459
1
    for (auto& [_, wal_table] : _table_map) {
460
0
        wal_table->stop();
461
0
    }
462
1
}
463
464
2
size_t WalManager::get_max_available_size() {
465
2
    return _wal_dirs_info->get_max_available_size();
466
2
}
467
468
0
std::string WalManager::get_wal_dirs_info_string() {
469
0
    return _wal_dirs_info->get_wal_dirs_info_string();
470
0
}
471
472
0
Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t limit) {
473
0
    return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit);
474
0
}
475
476
0
Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) {
477
0
    return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
478
0
}
479
480
Status WalManager::update_wal_dir_estimated_wal_bytes(const std::string& wal_dir,
481
                                                      size_t increase_estimated_wal_bytes,
482
0
                                                      size_t decrease_estimated_wal_bytes) {
483
0
    return _wal_dirs_info->update_wal_dir_estimated_wal_bytes(wal_dir, increase_estimated_wal_bytes,
484
0
                                                              decrease_estimated_wal_bytes);
485
0
}
486
487
0
Status WalManager::_update_wal_dir_info_thread() {
488
0
    while (!_stop.load()) {
489
0
        if (!ExecEnv::ready()) {
490
0
            VLOG_DEBUG << "Sleep 1s to wait for storage engine init.";
491
0
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
492
0
            continue;
493
0
        }
494
0
        static_cast<void>(_wal_dirs_info->update_all_wal_dir_limit());
495
0
        static_cast<void>(_wal_dirs_info->update_all_wal_dir_used());
496
0
        LOG_EVERY_N(INFO, 100) << "Scheduled(every 10s) WAL info: " << get_wal_dirs_info_string();
497
0
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
498
0
    }
499
0
    return Status::OK();
500
0
}
501
502
0
Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes) {
503
0
    return _wal_dirs_info->get_wal_dir_available_size(wal_dir, available_bytes);
504
0
}
505
506
0
std::string WalManager::get_base_wal_path(const std::string& wal_path_str) {
507
0
    io::Path wal_path = wal_path_str;
508
0
    for (int i = 0; i < 3; ++i) {
509
0
        if (!wal_path.has_parent_path()) {
510
0
            return "";
511
0
        }
512
0
        wal_path = wal_path.parent_path();
513
0
    }
514
0
    return wal_path.string();
515
0
}
516
517
Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock,
518
0
                                  std::shared_ptr<std::condition_variable> cv) {
519
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
520
0
    auto it = _wal_cv_map.find(wal_id);
521
0
    if (it != _wal_cv_map.end()) {
522
0
        return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id);
523
0
    }
524
0
    auto pair = std::make_pair(lock, cv);
525
0
    _wal_cv_map.emplace(wal_id, pair);
526
0
    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
527
0
    return Status::OK();
528
0
}
529
530
0
Status WalManager::erase_wal_cv_map(int64_t wal_id) {
531
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
532
0
    if (_wal_cv_map.erase(wal_id)) {
533
0
        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
534
0
    } else {
535
0
        return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id);
536
0
    }
537
0
    return Status::OK();
538
0
}
539
540
0
Status WalManager::wait_replay_wal_finish(int64_t wal_id) {
541
0
    std::shared_ptr<std::mutex> lock = nullptr;
542
0
    std::shared_ptr<std::condition_variable> cv = nullptr;
543
0
    auto st = get_lock_and_cv(wal_id, lock, cv);
544
0
    if (st.ok()) {
545
0
        std::unique_lock l(*(lock));
546
0
        LOG(INFO) << "start wait " << wal_id;
547
0
        if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) {
548
0
            LOG(WARNING) << "wait for " << wal_id << " is time out";
549
0
        }
550
0
        LOG(INFO) << "get wal " << wal_id << ",finish wait";
551
0
        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
552
0
        LOG(INFO) << "erase wal " << wal_id;
553
0
    }
554
0
    return Status::OK();
555
0
}
556
557
0
Status WalManager::notify_relay_wal(int64_t wal_id) {
558
0
    std::shared_ptr<std::mutex> lock = nullptr;
559
0
    std::shared_ptr<std::condition_variable> cv = nullptr;
560
0
    auto st = get_lock_and_cv(wal_id, lock, cv);
561
0
    if (st.ok()) {
562
0
        std::unique_lock l(*(lock));
563
0
        cv->notify_all();
564
0
        LOG(INFO) << "get wal " << wal_id << ",notify all";
565
0
    }
566
0
    return Status::OK();
567
0
}
568
569
Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock,
570
0
                                   std::shared_ptr<std::condition_variable>& cv) {
571
0
    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
572
0
    auto it = _wal_cv_map.find(wal_id);
573
0
    if (it == _wal_cv_map.end()) {
574
0
        return Status::InternalError("cannot find txn {} in _wal_cv_map", wal_id);
575
0
    }
576
0
    lock = it->second.first;
577
0
    cv = it->second.second;
578
0
    return Status::OK();
579
0
}
580
581
0
Status WalManager::delete_wal(int64_t table_id, int64_t wal_id) {
582
0
    std::string wal_path;
583
0
    {
584
0
        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
585
0
        auto it = _wal_path_map.find(wal_id);
586
0
        if (it != _wal_path_map.end()) {
587
0
            wal_path = it->second;
588
0
            auto st = io::global_local_filesystem()->delete_file(wal_path);
589
0
            if (st.ok()) {
590
0
                LOG(INFO) << "delete wal=" << wal_path;
591
0
            } else {
592
0
                LOG(WARNING) << "failed to delete wal=" << wal_path << ", st=" << st.to_string();
593
0
            }
594
0
            _wal_path_map.erase(wal_id);
595
0
        }
596
0
    }
597
0
    erase_wal_queue(table_id, wal_id);
598
0
    return Status::OK();
599
0
}
600
601
0
Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) {
602
0
    io::Path wal_path = wal;
603
0
    std::list<std::string> path_element;
604
0
    for (int i = 0; i < 3; ++i) {
605
0
        if (!wal_path.has_parent_path()) {
606
0
            return Status::InternalError("parent path is not enough when rename " + wal);
607
0
        }
608
0
        path_element.push_front(wal_path.filename().string());
609
0
        wal_path = wal_path.parent_path();
610
0
    }
611
0
    wal_path.append(_tmp);
612
0
    for (auto path : path_element) {
613
0
        wal_path.append(path);
614
0
    }
615
0
    bool exists = false;
616
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists));
617
0
    if (!exists) {
618
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
619
0
    }
620
0
    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
621
0
    if (res < 0) {
622
0
        LOG(INFO) << "failed to rename wal from " << wal << " to " << wal_path.string();
623
0
        return Status::InternalError("rename fail on path " + wal);
624
0
    }
625
0
    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
626
0
    {
627
0
        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
628
0
        auto it = _wal_path_map.find(wal_id);
629
0
        if (it != _wal_path_map.end()) {
630
0
            _wal_path_map.erase(wal_id);
631
0
        } else {
632
            LOG(WARNING) << "can't find " << wal_id << " in _wal_path_map when trying to rename";
633
0
        }
634
0
    }
635
0
    erase_wal_queue(table_id, wal_id);
636
0
    return Status::OK();
637
0
}
638
639
} // namespace doris