Coverage Report

Created: 2026-06-04 22:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/wal/wal_table.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/group_commit/wal/wal_table.h"
19
20
#include <absl/strings/str_split.h>
21
#include <thrift/protocol/TDebugProtocol.h>
22
23
#include "io/fs/local_file_system.h"
24
#include "io/fs/stream_load_pipe.h"
25
#include "load/group_commit/wal/wal_manager.h"
26
#include "runtime/fragment_mgr.h"
27
#include "service/http/action/http_stream.h"
28
#include "service/http/action/stream_load.h"
29
#include "service/http/ev_http_server.h"
30
#include "service/http/http_common.h"
31
#include "service/http/http_headers.h"
32
#include "service/http/utils.h"
33
#include "util/client_cache.h"
34
#include "util/path_util.h"
35
#include "util/thrift_rpc_helper.h"
36
37
namespace doris {
38
39
bvar::Adder<uint64_t> wal_fail("group_commit_wal_fail");
40
41
WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
42
0
        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {
43
0
    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
44
0
}
45
0
WalTable::~WalTable() {}
46
47
0
void WalTable::add_wal(int64_t wal_id, std::string wal) {
48
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
49
0
    LOG(INFO) << "add replay wal=" << wal;
50
0
    auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
51
0
    _replay_wal_map.emplace(wal, wal_info);
52
0
}
53
54
0
void WalTable::_pick_relay_wals() {
55
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
56
0
    std::vector<std::string> need_replay_wals;
57
0
    std::vector<std::string> need_erase_wals;
58
0
    for (const auto& [wal_path, wal_info] : _replay_wal_map) {
59
0
        if (config::group_commit_wait_replay_wal_finish &&
60
0
            wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
61
0
            LOG(WARNING) << "failed to replay wal=" << wal_path << " after retry "
62
0
                         << wal_info->get_retry_num() << " times";
63
0
            [[maybe_unused]] auto st = _exec_env->wal_mgr()->rename_to_tmp_path(
64
0
                    wal_path, _table_id, wal_info->get_wal_id());
65
0
            auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
66
0
            if (!notify_st.ok()) {
67
0
                LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
68
0
            }
69
0
            need_erase_wals.push_back(wal_path);
70
0
            continue;
71
0
        }
72
0
        if (_need_replay(wal_info)) {
73
0
            need_replay_wals.push_back(wal_path);
74
0
        }
75
0
    }
76
0
    for (const auto& wal : need_erase_wals) {
77
0
        _replay_wal_map.erase(wal);
78
0
    }
79
0
    std::sort(need_replay_wals.begin(), need_replay_wals.end());
80
0
    for (const auto& wal : need_replay_wals) {
81
0
        _replaying_queue.emplace_back(_replay_wal_map[wal]);
82
0
        _replay_wal_map.erase(wal);
83
0
    }
84
0
}
85
86
0
Status WalTable::_relay_wal_one_by_one() {
87
0
    std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
88
0
    for (auto wal_info : _replaying_queue) {
89
0
        wal_info->add_retry_num();
90
0
        Status st;
91
0
        int64_t file_size = 0;
92
0
        std::filesystem::path file_path(wal_info->get_wal_path());
93
0
        if (!std::filesystem::exists(file_path)) {
94
0
            st = Status::InternalError("wal file {} does not exist", wal_info->get_wal_path());
95
0
        } else {
96
0
            file_size = std::filesystem::file_size(file_path);
97
0
            st = _replay_wal_internal(wal_info->get_wal_path());
98
0
        }
99
0
        auto msg = st.msg();
100
0
        if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
101
0
            st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
102
0
            (msg.find("has already been used") != msg.npos &&
103
0
             (msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") != msg.npos))) {
104
0
            LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
105
0
                      << ", st=" << st.to_string() << ", file size=" << file_size;
106
            // delete wal
107
0
            WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, wal_info->get_wal_id()),
108
0
                          "failed to delete wal=" + wal_info->get_wal_path());
109
0
            if (config::group_commit_wait_replay_wal_finish) {
110
0
                RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()));
111
0
            }
112
0
        } else {
113
0
            doris::wal_fail << 1;
114
0
            LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
115
0
                         << ", st=" << st.to_string();
116
0
            need_retry_wals.push_back(wal_info);
117
0
        }
118
0
    }
119
0
    {
120
0
        std::lock_guard<std::mutex> lock(_replay_wal_lock);
121
0
        _replaying_queue.clear();
122
0
        for (auto retry_wal_info : need_retry_wals) {
123
0
            _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info);
124
0
        }
125
0
    }
126
0
    return Status::OK();
127
0
}
128
129
0
Status WalTable::replay_wals() {
130
0
    {
131
0
        std::lock_guard<std::mutex> lock(_replay_wal_lock);
132
0
        if (_replay_wal_map.empty()) {
133
0
            LOG(INFO) << "_replay_wal_map is empty, skip relaying for table_id=" << _table_id;
134
0
            return Status::OK();
135
0
        }
136
0
        if (!_replaying_queue.empty()) {
137
0
            LOG(INFO) << "_replaying_queue is not empty, skip relaying for table_id=" << _table_id;
138
0
            return Status::OK();
139
0
        }
140
0
    }
141
0
    VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id
142
0
               << ", wal size=" << _replay_wal_map.size();
143
0
    _pick_relay_wals();
144
0
    RETURN_IF_ERROR(_relay_wal_one_by_one());
145
0
    return Status::OK();
146
0
}
147
148
0
bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
149
0
    if (config::group_commit_wait_replay_wal_finish) {
150
0
        return true;
151
0
    }
152
#ifndef BE_TEST
153
    int64_t replay_interval = 0;
154
    if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
155
        replay_interval =
156
                int64_t(pow(2, config::group_commit_replay_wal_retry_num) *
157
                                config::group_commit_replay_wal_retry_interval_seconds * 1000 +
158
                        (wal_info->get_retry_num() - config::group_commit_replay_wal_retry_num) *
159
                                config::group_commit_replay_wal_retry_interval_max_seconds * 1000);
160
    } else {
161
        replay_interval = int64_t(pow(2, wal_info->get_retry_num()) *
162
                                  config::group_commit_replay_wal_retry_interval_seconds * 1000);
163
    }
164
    return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
165
#else
166
0
    return true;
167
0
#endif
168
0
}
169
170
0
Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
171
0
    TLoadTxnRollbackRequest request;
172
    // this is a fake, fe not check it now
173
    // should be removed in 3.1, use token instead
174
0
    request.__set_auth_code(0);
175
0
    request.__set_token(_exec_env->cluster_info()->curr_auth_token);
176
0
    request.__set_db_id(db_id);
177
0
    request.__set_label(label);
178
0
    request.__set_reason("relay wal with label " + label);
179
0
    TLoadTxnRollbackResult result;
180
0
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
181
0
    auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
182
0
            master_addr.hostname, master_addr.port,
183
0
            [&request, &result](FrontendServiceConnection& client) {
184
0
                client->loadTxnRollback(result, request);
185
0
            });
186
0
    if (st.ok()) {
187
0
        auto result_status = Status::create<false>(result.status);
188
0
        LOG(INFO) << "abort label " << label << ", result_status:" << result_status;
189
0
        return result_status;
190
0
    } else {
191
0
        LOG(WARNING) << "abort label " << label << ", rpc error:" << st;
192
0
        return st;
193
0
    }
194
0
}
195
196
0
Status WalTable::_replay_wal_internal(const std::string& wal) {
197
0
    LOG(INFO) << "start replay wal=" << wal;
198
0
    int64_t version = -1;
199
0
    int64_t backend_id = -1;
200
0
    int64_t wal_id = -1;
201
0
    std::string label = "";
202
0
    io::Path wal_path = wal;
203
0
    auto file_name = wal_path.filename().string();
204
0
    RETURN_IF_ERROR(WalManager::parse_wal_path(file_name, version, backend_id, wal_id, label));
205
#ifndef BE_TEST
206
    if (!config::group_commit_wait_replay_wal_finish) {
207
        [[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
208
    }
209
#endif
210
0
    DBUG_EXECUTE_IF("WalTable.replay_wals.stop",
211
0
                    { return Status::InternalError("WalTable.replay_wals.stop"); });
212
0
    return _replay_one_wal_with_streamload(wal_id, wal, label);
213
0
}
214
215
Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
216
0
                                    std::string& sql_str) {
217
0
    std::string columns;
218
0
    RETURN_IF_ERROR(_read_wal_header(wal, columns));
219
0
    std::vector<std::string> column_id_vector =
220
0
            absl::StrSplit(columns, ",", absl::SkipWhitespace());
221
0
    std::map<int64_t, std::string> column_info_map;
222
0
    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map));
223
0
    std::stringstream ss_name;
224
0
    for (auto column_id_str : column_id_vector) {
225
0
        try {
226
0
            int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
227
0
            auto it = column_info_map.find(column_id);
228
0
            if (it != column_info_map.end()) {
229
0
                ss_name << "`" << it->second << "`,";
230
0
                column_info_map.erase(column_id);
231
0
            }
232
0
        } catch (const std::invalid_argument& e) {
233
0
            return Status::InvalidArgument("Invalid format, {}", e.what());
234
0
        }
235
0
    }
236
0
    auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
237
0
    std::stringstream ss;
238
0
    ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
239
0
       << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
240
0
       << std::to_string(_table_id) << "\")";
241
0
    sql_str = ss.str().data();
242
0
    return Status::OK();
243
0
}
244
245
Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
246
0
                                     const std::string& label) {
247
0
    std::string sql_str;
248
0
    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str));
249
0
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
250
0
    ctx->sql_str = sql_str;
251
0
    ctx->db_id = _db_id;
252
0
    ctx->table_id = _table_id;
253
0
    ctx->wal_id = wal_id;
254
0
    ctx->label = label;
255
0
    ctx->need_commit_self = false;
256
0
    ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
257
0
    ctx->auth.user = "admin";
258
0
    ctx->group_commit = false;
259
0
    ctx->load_type = TLoadType::MANUL_LOAD;
260
0
    ctx->load_src_type = TLoadSourceType::RAW;
261
0
    ctx->max_filter_ratio = 1;
262
0
    auto st = _http_stream_action->process_put(nullptr, ctx);
263
0
    DBUG_EXECUTE_IF("WalTable::_handle_stream_load.fail",
264
0
                    { st = Status::InternalError("WalTable::_handle_stream_load.fail"); });
265
0
    if (st.ok()) {
266
        // wait stream load finish
267
0
        RETURN_IF_ERROR(ctx->load_status_future.get());
268
0
        if (ctx->status.ok()) {
269
            // deprecated and should be removed in 3.1, use token instead.
270
0
            ctx->auth.auth_code = wal_id;
271
0
            st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
272
0
        } else {
273
0
            st = ctx->status;
274
0
        }
275
0
    }
276
0
    if (!st.ok()) {
277
0
        _exec_env->stream_load_executor()->rollback_txn(ctx.get());
278
0
    }
279
0
    return st;
280
0
}
281
282
Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal,
283
0
                                                 const std::string& label) {
284
#ifndef BE_TEST
285
    return _handle_stream_load(wal_id, wal, label);
286
#else
287
0
    return Status::OK();
288
0
#endif
289
0
}
290
291
0
void WalTable::stop() {
292
0
    do {
293
0
        {
294
0
            std::lock_guard<std::mutex> lock(_replay_wal_lock);
295
0
            if (_replay_wal_map.empty() && _replaying_queue.empty()) {
296
0
                break;
297
0
            }
298
0
            LOG(INFO) << "stopping wal_table,wait for relay wal task done, now "
299
0
                      << _replay_wal_map.size() << " wals wait to replay, "
300
0
                      << _replaying_queue.size() << " wals are replaying";
301
0
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
302
0
        }
303
0
    } while (true);
304
0
}
305
306
0
size_t WalTable::size() {
307
0
    std::lock_guard<std::mutex> lock(_replay_wal_lock);
308
0
    return _replay_wal_map.size() + _replaying_queue.size();
309
0
}
310
311
Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id,
312
0
                                  std::map<int64_t, std::string>& column_info_map) {
313
0
    TGetColumnInfoRequest request;
314
0
    request.__set_db_id(db_id);
315
0
    request.__set_table_id(tb_id);
316
0
    TGetColumnInfoResult result;
317
0
    Status status;
318
0
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
319
0
    if (master_addr.hostname.empty() || master_addr.port == 0) {
320
0
        status = Status::InternalError<false>("Have not get FE Master heartbeat yet");
321
0
    } else {
322
0
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
323
0
                master_addr.hostname, master_addr.port,
324
0
                [&request, &result](FrontendServiceConnection& client) {
325
0
                    client->getColumnInfo(result, request);
326
0
                }));
327
0
        status = Status::create<false>(result.status);
328
0
        if (!status.ok()) {
329
0
            return status;
330
0
        }
331
0
        std::vector<TColumnInfo> column_element = result.columns;
332
0
        for (auto column : column_element) {
333
0
            auto column_name = column.column_name;
334
0
            auto column_id = column.column_id;
335
0
            column_info_map.emplace(column_id, column_name);
336
0
        }
337
0
    }
338
0
    return status;
339
0
}
340
341
0
Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) {
342
0
    std::shared_ptr<doris::WalFileReader> wal_reader =
343
0
            std::make_shared<doris::WalFileReader>(wal_path);
344
0
    RETURN_IF_ERROR(wal_reader->init());
345
0
    uint32_t version = 0;
346
0
    RETURN_IF_ERROR(wal_reader->read_header(version, columns));
347
0
    VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version)
348
0
               << ",columns=" << columns;
349
0
    RETURN_IF_ERROR(wal_reader->finalize());
350
0
    return Status::OK();
351
0
}
352
353
} // namespace doris