Coverage Report

Created: 2026-05-09 01:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/autoinc_buffer.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
22
#include <chrono>
23
#include <mutex>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/runtime_profile.h"
29
#include "util/client_cache.h"
30
#include "util/debug_points.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
35
AutoIncIDBuffer::AutoIncIDBuffer(int64_t db_id, int64_t table_id, int64_t column_id)
36
4
        : _db_id(db_id),
37
4
          _table_id(table_id),
38
4
          _column_id(column_id),
39
4
          _rpc_token(GlobalAutoIncBuffers::GetInstance()->create_token()) {}
40
41
8
void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
42
8
    if (batch_size > _batch_size) {
43
4
        _batch_size = batch_size;
44
4
    }
45
8
}
46
47
4
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
48
4
    LOG_INFO(
49
4
            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
50
4
            "db_id={}, table_id={}, column_id={}, length={}",
51
4
            _db_id, _table_id, _column_id, length);
52
4
    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
53
4
    _rpc_status = Status::OK();
54
4
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
55
4
    for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
56
4
        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
57
4
            _rpc_status = Status::InternalError<false>("injected error");
58
4
            break;
59
4
        });
60
4
        TAutoIncrementRangeRequest request;
61
4
        TAutoIncrementRangeResult result;
62
4
        request.__set_db_id(_db_id);
63
4
        request.__set_table_id(_table_id);
64
4
        request.__set_column_id(_column_id);
65
4
        request.__set_length(length);
66
67
4
        int64_t get_auto_inc_range_rpc_ns = 0;
68
4
        {
69
4
            SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
70
4
            _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
71
4
                    master_addr.hostname, master_addr.port,
72
4
                    [&request, &result](FrontendServiceConnection& client) {
73
4
                        client->getAutoIncrementRange(result, request);
74
4
                    });
75
4
        }
76
77
4
        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
78
0
            LOG_WARNING(
79
0
                    "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
80
0
                    "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
81
0
                    "column_id={}",
82
0
                    master_addr.hostname, master_addr.port, result.master_address.hostname,
83
0
                    result.master_address.port, retry_times, _db_id, _table_id, _column_id);
84
0
            master_addr = result.master_address;
85
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
86
0
            continue;
87
0
        }
88
89
4
        if (!_rpc_status.ok()) {
90
0
            LOG_WARNING(
91
0
                    "Failed to fetch auto-increment range, encounter rpc failure. "
92
0
                    "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
93
0
                    _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
94
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
95
0
            continue;
96
0
        }
97
4
        if (result.length != length) [[unlikely]] {
98
0
            auto msg = fmt::format(
99
0
                    "Failed to fetch auto-increment range, request length={}, but get "
100
0
                    "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
101
0
                    length, result.length, retry_times, _db_id, _table_id, _column_id);
102
0
            LOG(WARNING) << msg;
103
0
            _rpc_status = Status::RpcError<true>(msg);
104
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
105
0
            continue;
106
0
        }
107
108
4
        LOG_INFO(
109
4
                "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
110
4
                "retry_time={}, db_id={}, table_id={}, column_id={}",
111
4
                master_addr.hostname, master_addr.port, result.start, result.length,
112
4
                get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
113
4
        return result.start;
114
4
    }
115
4
    CHECK(!_rpc_status.ok());
116
0
    return ResultError(_rpc_status);
117
4
}
118
119
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
120
8
        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
121
8
    std::lock_guard<std::mutex> lock {_latch};
122
12
    while (request_length > 0 && !_buffers.empty()) {
123
4
        auto& autoinc_range = _buffers.front();
124
4
        CHECK_GT(autoinc_range.length, 0);
125
4
        auto min_length = std::min(request_length, autoinc_range.length);
126
4
        result->emplace_back(autoinc_range.start, min_length);
127
4
        autoinc_range.consume(min_length);
128
4
        _current_volume -= min_length;
129
4
        request_length -= min_length;
130
4
        if (autoinc_range.empty()) {
131
0
            _buffers.pop_front();
132
0
        }
133
4
    }
134
8
}
135
136
Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
137
4
                                         std::vector<std::pair<int64_t, size_t>>* result) {
138
4
    std::lock_guard<std::mutex> lock(_mutex);
139
8
    while (request_length > 0) {
140
8
        _get_autoinc_ranges_from_buffers(request_length, result);
141
8
        if (request_length == 0) {
142
4
            break;
143
4
        }
144
4
        if (!_is_fetching) {
145
4
            RETURN_IF_ERROR(
146
4
                    _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
147
4
        }
148
4
        _rpc_token->wait();
149
4
        CHECK(!_is_fetching);
150
4
        if (!_rpc_status.ok()) {
151
0
            return _rpc_status;
152
0
        }
153
4
    }
154
4
    CHECK_EQ(request_length, 0);
155
4
    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
156
0
        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
157
0
    }
158
4
    return Status::OK();
159
4
}
160
161
4
Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
162
4
    _is_fetching = true;
163
4
    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
164
4
        auto&& res = _fetch_ids_from_fe(length);
165
4
        if (!res.has_value()) [[unlikely]] {
166
4
            auto&& err = res.error();
167
4
            LOG_WARNING(
168
4
                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
169
4
                    "values from fe, db_id={}, table_id={}, column_id={}, status={}",
170
4
                    _db_id, _table_id, _column_id, err);
171
4
            _is_fetching = false;
172
4
            return;
173
4
        }
174
4
        int64_t start = res.value();
175
4
        LOG_INFO(
176
4
                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
177
4
                "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
178
4
                _db_id, _table_id, _column_id, start, length);
179
4
        {
180
4
            std::lock_guard<std::mutex> lock {_latch};
181
4
            _buffers.emplace_back(start, length);
182
4
            _current_volume += length;
183
4
        }
184
4
        _is_fetching = false;
185
4
    }));
186
4
    return Status::OK();
187
4
}
188
189
} // namespace doris