Coverage Report

Created: 2026-03-12 02:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/wal/wal_table.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/group_commit/wal/wal_table.h"
19
20
#include <absl/strings/str_split.h>
21
#include <thrift/protocol/TDebugProtocol.h>
22
23
#include "io/fs/local_file_system.h"
24
#include "io/fs/stream_load_pipe.h"
25
#include "load/group_commit/wal/wal_manager.h"
26
#include "runtime/fragment_mgr.h"
27
#include "service/http/action/http_stream.h"
28
#include "service/http/action/stream_load.h"
29
#include "service/http/ev_http_server.h"
30
#include "service/http/http_common.h"
31
#include "service/http/http_headers.h"
32
#include "service/http/utils.h"
33
#include "util/client_cache.h"
34
#include "util/path_util.h"
35
#include "util/thrift_rpc_helper.h"
36
37
namespace doris {
38
39
bvar::Adder<uint64_t> wal_fail("group_commit_wal_fail");
40
41
WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
42
0
        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {
43
0
    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
44
0
}
45
0
WalTable::~WalTable() {}
46
47
0
void WalTable::add_wal(int64_t wal_id, std::string wal) {
48
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
49
0
    LOG(INFO) << "add replay wal=" << wal;
50
0
    auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
51
0
    _replay_wal_map.emplace(wal, wal_info);
52
0
}
53
54
0
void WalTable::_pick_relay_wals() {
55
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
56
0
    std::vector<std::string> need_replay_wals;
57
0
    std::vector<std::string> need_erase_wals;
58
0
    for (const auto& [wal_path, wal_info] : _replay_wal_map) {
59
0
        if (config::group_commit_wait_replay_wal_finish &&
60
0
            wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
61
0
            LOG(WARNING) << "failed to replay wal=" << wal_path << " after retry "
62
0
                         << wal_info->get_retry_num() << " times";
63
0
            [[maybe_unused]] auto st = _exec_env->wal_mgr()->rename_to_tmp_path(
64
0
                    wal_path, _table_id, wal_info->get_wal_id());
65
0
            auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
66
0
            if (!notify_st.ok()) {
67
0
                LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
68
0
            }
69
0
            need_erase_wals.push_back(wal_path);
70
0
            continue;
71
0
        }
72
0
        if (_need_replay(wal_info)) {
73
0
            need_replay_wals.push_back(wal_path);
74
0
        }
75
0
    }
76
0
    for (const auto& wal : need_erase_wals) {
77
0
        _replay_wal_map.erase(wal);
78
0
    }
79
0
    std::sort(need_replay_wals.begin(), need_replay_wals.end());
80
0
    for (const auto& wal : need_replay_wals) {
81
0
        _replaying_queue.emplace_back(_replay_wal_map[wal]);
82
0
        _replay_wal_map.erase(wal);
83
0
    }
84
0
}
85
86
0
Status WalTable::_relay_wal_one_by_one() {
87
0
    std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
88
0
    for (auto wal_info : _replaying_queue) {
89
0
        wal_info->add_retry_num();
90
0
        Status st;
91
0
        int64_t file_size = 0;
92
0
        std::filesystem::path file_path(wal_info->get_wal_path());
93
0
        if (!std::filesystem::exists(file_path)) {
94
0
            st = Status::InternalError("wal file {} does not exist", wal_info->get_wal_path());
95
0
        } else {
96
0
            file_size = std::filesystem::file_size(file_path);
97
0
            st = _replay_wal_internal(wal_info->get_wal_path());
98
0
        }
99
0
        auto msg = st.msg();
100
0
        if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
101
0
            st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
102
0
            (msg.find("has already been used") != msg.npos &&
103
0
             (msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") != msg.npos))) {
104
0
            LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
105
0
                      << ", st=" << st.to_string() << ", file size=" << file_size;
106
            // delete wal
107
0
            WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, wal_info->get_wal_id()),
108
0
                          "failed to delete wal=" + wal_info->get_wal_path());
109
0
            if (config::group_commit_wait_replay_wal_finish) {
110
0
                RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()));
111
0
            }
112
0
        } else {
113
0
            doris::wal_fail << 1;
114
0
            LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
115
0
                         << ", st=" << st.to_string();
116
0
            need_retry_wals.push_back(wal_info);
117
0
        }
118
0
    }
119
0
    {
120
0
        std::lock_guard<std::mutex> lock(_replay_wal_lock);
121
0
        _replaying_queue.clear();
122
0
        for (auto retry_wal_info : need_retry_wals) {
123
0
            _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info);
124
0
        }
125
0
    }
126
0
    return Status::OK();
127
0
}
128
129
0
Status WalTable::replay_wals() {
130
0
    {
131
0
        std::lock_guard<std::mutex> lock(_replay_wal_lock);
132
0
        if (_replay_wal_map.empty()) {
133
0
            LOG(INFO) << "_replay_wal_map is empty, skip relaying for table_id=" << _table_id;
134
0
            return Status::OK();
135
0
        }
136
0
        if (!_replaying_queue.empty()) {
137
0
            LOG(INFO) << "_replaying_queue is not empty, skip relaying for table_id=" << _table_id;
138
0
            return Status::OK();
139
0
        }
140
0
    }
141
0
    VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id
142
0
               << ", wal size=" << _replay_wal_map.size();
143
0
    _pick_relay_wals();
144
0
    RETURN_IF_ERROR(_relay_wal_one_by_one());
145
0
    return Status::OK();
146
0
}
147
148
0
bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
149
0
    if (config::group_commit_wait_replay_wal_finish) {
150
0
        return true;
151
0
    }
152
#ifndef BE_TEST
153
    int64_t replay_interval = 0;
154
    if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
155
        replay_interval =
156
                int64_t(pow(2, config::group_commit_replay_wal_retry_num) *
157
                                config::group_commit_replay_wal_retry_interval_seconds * 1000 +
158
                        (wal_info->get_retry_num() - config::group_commit_replay_wal_retry_num) *
159
                                config::group_commit_replay_wal_retry_interval_max_seconds * 1000);
160
    } else {
161
        replay_interval = int64_t(pow(2, wal_info->get_retry_num()) *
162
                                  config::group_commit_replay_wal_retry_interval_seconds * 1000);
163
    }
164
    return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
165
#else
166
0
    return true;
167
0
#endif
168
0
}
169
170
0
Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
171
0
    TLoadTxnRollbackRequest request;
172
    // this is a fake, fe not check it now
173
    // should be removed in 3.1, use token instead
174
0
    request.__set_auth_code(0);
175
0
    request.__set_token(_exec_env->cluster_info()->curr_auth_token);
176
0
    request.__set_db_id(db_id);
177
0
    request.__set_label(label);
178
0
    request.__set_reason("relay wal with label " + label);
179
0
    TLoadTxnRollbackResult result;
180
0
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
181
0
    auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
182
0
            master_addr.hostname, master_addr.port,
183
0
            [&request, &result](FrontendServiceConnection& client) {
184
0
                client->loadTxnRollback(result, request);
185
0
            });
186
0
    auto result_status = Status::create<false>(result.status);
187
0
    LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status;
188
0
    return result_status;
189
0
}
190
191
0
Status WalTable::_replay_wal_internal(const std::string& wal) {
192
0
    LOG(INFO) << "start replay wal=" << wal;
193
0
    int64_t version = -1;
194
0
    int64_t backend_id = -1;
195
0
    int64_t wal_id = -1;
196
0
    std::string label = "";
197
0
    io::Path wal_path = wal;
198
0
    auto file_name = wal_path.filename().string();
199
0
    RETURN_IF_ERROR(WalManager::parse_wal_path(file_name, version, backend_id, wal_id, label));
200
#ifndef BE_TEST
201
    if (!config::group_commit_wait_replay_wal_finish) {
202
        [[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
203
    }
204
#endif
205
0
    DBUG_EXECUTE_IF("WalTable.replay_wals.stop",
206
0
                    { return Status::InternalError("WalTable.replay_wals.stop"); });
207
0
    return _replay_one_wal_with_streamload(wal_id, wal, label);
208
0
}
209
210
Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
211
0
                                    std::string& sql_str) {
212
0
    std::string columns;
213
0
    RETURN_IF_ERROR(_read_wal_header(wal, columns));
214
0
    std::vector<std::string> column_id_vector =
215
0
            absl::StrSplit(columns, ",", absl::SkipWhitespace());
216
0
    std::map<int64_t, std::string> column_info_map;
217
0
    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map));
218
0
    std::stringstream ss_name;
219
0
    for (auto column_id_str : column_id_vector) {
220
0
        try {
221
0
            int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
222
0
            auto it = column_info_map.find(column_id);
223
0
            if (it != column_info_map.end()) {
224
0
                ss_name << "`" << it->second << "`,";
225
0
                column_info_map.erase(column_id);
226
0
            }
227
0
        } catch (const std::invalid_argument& e) {
228
0
            return Status::InvalidArgument("Invalid format, {}", e.what());
229
0
        }
230
0
    }
231
0
    auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
232
0
    std::stringstream ss;
233
0
    ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
234
0
       << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
235
0
       << std::to_string(_table_id) << "\")";
236
0
    sql_str = ss.str().data();
237
0
    return Status::OK();
238
0
}
239
240
Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
241
0
                                     const std::string& label) {
242
0
    std::string sql_str;
243
0
    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str));
244
0
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
245
0
    ctx->sql_str = sql_str;
246
0
    ctx->db_id = _db_id;
247
0
    ctx->table_id = _table_id;
248
0
    ctx->wal_id = wal_id;
249
0
    ctx->label = label;
250
0
    ctx->need_commit_self = false;
251
0
    ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
252
0
    ctx->auth.user = "admin";
253
0
    ctx->group_commit = false;
254
0
    ctx->load_type = TLoadType::MANUL_LOAD;
255
0
    ctx->load_src_type = TLoadSourceType::RAW;
256
0
    ctx->max_filter_ratio = 1;
257
0
    auto st = _http_stream_action->process_put(nullptr, ctx);
258
0
    DBUG_EXECUTE_IF("WalTable::_handle_stream_load.fail",
259
0
                    { st = Status::InternalError("WalTable::_handle_stream_load.fail"); });
260
0
    if (st.ok()) {
261
        // wait stream load finish
262
0
        RETURN_IF_ERROR(ctx->load_status_future.get());
263
0
        if (ctx->status.ok()) {
264
            // deprecated and should be removed in 3.1, use token instead.
265
0
            ctx->auth.auth_code = wal_id;
266
0
            st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
267
0
        } else {
268
0
            st = ctx->status;
269
0
        }
270
0
    }
271
0
    if (!st.ok()) {
272
0
        _exec_env->stream_load_executor()->rollback_txn(ctx.get());
273
0
    }
274
0
    return st;
275
0
}
276
277
Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal,
278
0
                                                 const std::string& label) {
279
#ifndef BE_TEST
280
    return _handle_stream_load(wal_id, wal, label);
281
#else
282
0
    return Status::OK();
283
0
#endif
284
0
}
285
286
0
void WalTable::stop() {
287
0
    do {
288
0
        {
289
0
            std::lock_guard<std::mutex> lock(_replay_wal_lock);
290
0
            if (_replay_wal_map.empty() && _replaying_queue.empty()) {
291
0
                break;
292
0
            }
293
0
            LOG(INFO) << "stopping wal_table,wait for relay wal task done, now "
294
0
                      << _replay_wal_map.size() << " wals wait to replay, "
295
0
                      << _replaying_queue.size() << " wals are replaying";
296
0
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
297
0
        }
298
0
    } while (true);
299
0
}
300
301
0
size_t WalTable::size() {
302
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
303
0
    return _replay_wal_map.size() + _replaying_queue.size();
304
0
}
305
306
Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id,
307
0
                                  std::map<int64_t, std::string>& column_info_map) {
308
0
    TGetColumnInfoRequest request;
309
0
    request.__set_db_id(db_id);
310
0
    request.__set_table_id(tb_id);
311
0
    TGetColumnInfoResult result;
312
0
    Status status;
313
0
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
314
0
    if (master_addr.hostname.empty() || master_addr.port == 0) {
315
0
        status = Status::InternalError<false>("Have not get FE Master heartbeat yet");
316
0
    } else {
317
0
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
318
0
                master_addr.hostname, master_addr.port,
319
0
                [&request, &result](FrontendServiceConnection& client) {
320
0
                    client->getColumnInfo(result, request);
321
0
                }));
322
0
        status = Status::create<false>(result.status);
323
0
        if (!status.ok()) {
324
0
            return status;
325
0
        }
326
0
        std::vector<TColumnInfo> column_element = result.columns;
327
0
        for (auto column : column_element) {
328
0
            auto column_name = column.column_name;
329
0
            auto column_id = column.column_id;
330
0
            column_info_map.emplace(column_id, column_name);
331
0
        }
332
0
    }
333
0
    return status;
334
0
}
335
336
0
Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) {
337
0
    std::shared_ptr<doris::WalFileReader> wal_reader =
338
0
            std::make_shared<doris::WalFileReader>(wal_path);
339
0
    RETURN_IF_ERROR(wal_reader->init());
340
0
    uint32_t version = 0;
341
0
    RETURN_IF_ERROR(wal_reader->read_header(version, columns));
342
0
    VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version)
343
0
               << ",columns=" << columns;
344
0
    RETURN_IF_ERROR(wal_reader->finalize());
345
0
    return Status::OK();
346
0
}
347
348
} // namespace doris