Coverage Report

Created: 2026-04-15 09:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/autoinc_buffer.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
22
#include <chrono>
23
#include <mutex>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/runtime_profile.h"
29
#include "util/client_cache.h"
30
#include "util/debug_points.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
35
AutoIncIDBuffer::AutoIncIDBuffer(int64_t db_id, int64_t table_id, int64_t column_id)
36
108
        : _db_id(db_id),
37
108
          _table_id(table_id),
38
108
          _column_id(column_id),
39
108
          _rpc_token(GlobalAutoIncBuffers::GetInstance()->create_token()) {}
40
41
255
void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
42
255
    if (batch_size > _batch_size) {
43
70
        _batch_size = batch_size;
44
70
    }
45
255
}
46
47
85
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
48
85
    LOG_INFO(
49
85
            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
50
85
            "db_id={}, table_id={}, column_id={}, length={}",
51
85
            _db_id, _table_id, _column_id, length);
52
85
    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
53
85
    _rpc_status = Status::OK();
54
85
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
55
85
    for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
56
85
        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
57
85
            _rpc_status = Status::InternalError<false>("injected error");
58
85
            break;
59
85
        });
60
85
        TAutoIncrementRangeRequest request;
61
85
        TAutoIncrementRangeResult result;
62
85
        request.__set_db_id(_db_id);
63
85
        request.__set_table_id(_table_id);
64
85
        request.__set_column_id(_column_id);
65
85
        request.__set_length(length);
66
67
85
        int64_t get_auto_inc_range_rpc_ns = 0;
68
85
        {
69
85
            SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
70
85
            _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
71
85
                    master_addr.hostname, master_addr.port,
72
85
                    [&request, &result](FrontendServiceConnection& client) {
73
85
                        client->getAutoIncrementRange(result, request);
74
85
                    });
75
85
        }
76
77
85
        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
78
0
            LOG_WARNING(
79
0
                    "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
80
0
                    "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
81
0
                    "column_id={}",
82
0
                    master_addr.hostname, master_addr.port, result.master_address.hostname,
83
0
                    result.master_address.port, retry_times, _db_id, _table_id, _column_id);
84
0
            master_addr = result.master_address;
85
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
86
0
            continue;
87
0
        }
88
89
85
        if (!_rpc_status.ok()) {
90
0
            LOG_WARNING(
91
0
                    "Failed to fetch auto-increment range, encounter rpc failure. "
92
0
                    "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
93
0
                    _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
94
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
95
0
            continue;
96
0
        }
97
85
        if (result.length != length) [[unlikely]] {
98
0
            auto msg = fmt::format(
99
0
                    "Failed to fetch auto-increment range, request length={}, but get "
100
0
                    "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
101
0
                    length, result.length, retry_times, _db_id, _table_id, _column_id);
102
0
            LOG(WARNING) << msg;
103
0
            _rpc_status = Status::RpcError<true>(msg);
104
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
105
0
            continue;
106
0
        }
107
108
85
        LOG_INFO(
109
85
                "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
110
85
                "retry_time={}, db_id={}, table_id={}, column_id={}",
111
85
                master_addr.hostname, master_addr.port, result.start, result.length,
112
85
                get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
113
85
        return result.start;
114
85
    }
115
85
    CHECK(!_rpc_status.ok());
116
0
    return ResultError(_rpc_status);
117
85
}
118
119
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
120
272
        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
121
272
    std::lock_guard<std::mutex> lock {_latch};
122
460
    while (request_length > 0 && !_buffers.empty()) {
123
188
        auto& autoinc_range = _buffers.front();
124
188
        CHECK_GT(autoinc_range.length, 0);
125
188
        auto min_length = std::min(request_length, autoinc_range.length);
126
188
        result->emplace_back(autoinc_range.start, min_length);
127
188
        autoinc_range.consume(min_length);
128
188
        _current_volume -= min_length;
129
188
        request_length -= min_length;
130
188
        if (autoinc_range.empty()) {
131
0
            _buffers.pop_front();
132
0
        }
133
188
    }
134
272
}
135
136
Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
137
193
                                         std::vector<std::pair<int64_t, size_t>>* result) {
138
193
    std::lock_guard<std::mutex> lock(_mutex);
139
277
    while (request_length > 0) {
140
272
        _get_autoinc_ranges_from_buffers(request_length, result);
141
272
        if (request_length == 0) {
142
188
            break;
143
188
        }
144
84
        if (!_is_fetching) {
145
84
            RETURN_IF_ERROR(
146
84
                    _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
147
84
        }
148
84
        _rpc_token->wait();
149
84
        CHECK(!_is_fetching);
150
84
        if (!_rpc_status.ok()) {
151
0
            return _rpc_status;
152
0
        }
153
84
    }
154
193
    CHECK_EQ(request_length, 0);
155
193
    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
156
1
        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
157
1
    }
158
193
    return Status::OK();
159
193
}
160
161
85
Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
162
85
    _is_fetching = true;
163
85
    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
164
85
        auto&& res = _fetch_ids_from_fe(length);
165
85
        if (!res.has_value()) [[unlikely]] {
166
85
            auto&& err = res.error();
167
85
            LOG_WARNING(
168
85
                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
169
85
                    "values from fe, db_id={}, table_id={}, column_id={}, status={}",
170
85
                    _db_id, _table_id, _column_id, err);
171
85
            _is_fetching = false;
172
85
            return;
173
85
        }
174
85
        int64_t start = res.value();
175
85
        LOG_INFO(
176
85
                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
177
85
                "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
178
85
                _db_id, _table_id, _column_id, start, length);
179
85
        {
180
85
            std::lock_guard<std::mutex> lock {_latch};
181
85
            _buffers.emplace_back(start, length);
182
85
            _current_volume += length;
183
85
        }
184
85
        _is_fetching = false;
185
85
    }));
186
85
    return Status::OK();
187
85
}
188
189
} // namespace doris