Coverage Report

Created: 2026-06-09 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/autoinc_buffer.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
22
#include <chrono>
23
#include <mutex>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/runtime_profile.h"
29
#include "util/client_cache.h"
30
#include "util/debug_points.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
35
AutoIncIDBuffer::AutoIncIDBuffer(int64_t db_id, int64_t table_id, int64_t column_id)
36
109
        : _db_id(db_id),
37
109
          _table_id(table_id),
38
109
          _column_id(column_id),
39
109
          _rpc_token(GlobalAutoIncBuffers::GetInstance()->create_token()) {}
40
41
236
void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
42
236
    if (batch_size > _batch_size) {
43
68
        _batch_size = batch_size;
44
68
    }
45
236
}
46
47
83
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
48
83
    LOG_INFO(
49
83
            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
50
83
            "db_id={}, table_id={}, column_id={}, length={}",
51
83
            _db_id, _table_id, _column_id, length);
52
83
    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
53
83
    _rpc_status = Status::OK();
54
83
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
55
83
    for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
56
83
        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
57
83
            _rpc_status = Status::InternalError<false>("injected error");
58
83
            break;
59
83
        });
60
83
        TAutoIncrementRangeRequest request;
61
83
        TAutoIncrementRangeResult result;
62
83
        request.__set_db_id(_db_id);
63
83
        request.__set_table_id(_table_id);
64
83
        request.__set_column_id(_column_id);
65
83
        request.__set_length(length);
66
67
83
        int64_t get_auto_inc_range_rpc_ns = 0;
68
83
        {
69
83
            SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
70
83
            _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
71
83
                    master_addr.hostname, master_addr.port,
72
83
                    [&request, &result](FrontendServiceConnection& client) {
73
83
                        client->getAutoIncrementRange(result, request);
74
83
                    });
75
83
        }
76
77
83
        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
78
0
            LOG_WARNING(
79
0
                    "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
80
0
                    "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
81
0
                    "column_id={}",
82
0
                    master_addr.hostname, master_addr.port, result.master_address.hostname,
83
0
                    result.master_address.port, retry_times, _db_id, _table_id, _column_id);
84
0
            master_addr = result.master_address;
85
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
86
0
            continue;
87
0
        }
88
89
83
        if (!_rpc_status.ok()) {
90
0
            LOG_WARNING(
91
0
                    "Failed to fetch auto-increment range, encounter rpc failure. "
92
0
                    "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
93
0
                    _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
94
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
95
0
            continue;
96
0
        }
97
83
        if (result.length != length) [[unlikely]] {
98
0
            auto msg = fmt::format(
99
0
                    "Failed to fetch auto-increment range, request length={}, but get "
100
0
                    "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
101
0
                    length, result.length, retry_times, _db_id, _table_id, _column_id);
102
0
            LOG(WARNING) << msg;
103
0
            _rpc_status = Status::RpcError<true>(msg);
104
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
105
0
            continue;
106
0
        }
107
108
83
        LOG_INFO(
109
83
                "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
110
83
                "retry_time={}, db_id={}, table_id={}, column_id={}",
111
83
                master_addr.hostname, master_addr.port, result.start, result.length,
112
83
                get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
113
83
        return result.start;
114
83
    }
115
83
    CHECK(!_rpc_status.ok());
116
0
    return ResultError(_rpc_status);
117
83
}
118
119
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
120
282
        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
121
282
    std::lock_guard<std::mutex> lock {_latch};
122
482
    while (request_length > 0 && !_buffers.empty()) {
123
200
        auto& autoinc_range = _buffers.front();
124
200
        CHECK_GT(autoinc_range.length, 0);
125
200
        auto min_length = std::min(request_length, autoinc_range.length);
126
200
        result->emplace_back(autoinc_range.start, min_length);
127
200
        autoinc_range.consume(min_length);
128
200
        _current_volume -= min_length;
129
200
        request_length -= min_length;
130
200
        if (autoinc_range.empty()) {
131
3
            _buffers.pop_front();
132
3
        }
133
200
    }
134
282
}
135
136
Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
137
202
                                         std::vector<std::pair<int64_t, size_t>>* result) {
138
202
    std::lock_guard<std::mutex> lock(_mutex);
139
284
    while (request_length > 0) {
140
279
        _get_autoinc_ranges_from_buffers(request_length, result);
141
279
        if (request_length == 0) {
142
197
            break;
143
197
        }
144
82
        if (!_is_fetching) {
145
82
            RETURN_IF_ERROR(
146
82
                    _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
147
82
        }
148
82
        _rpc_token->wait();
149
82
        CHECK(!_is_fetching);
150
82
        if (!_rpc_status.ok()) {
151
0
            return _rpc_status;
152
0
        }
153
82
    }
154
202
    CHECK_EQ(request_length, 0);
155
202
#ifndef BE_TEST
156
202
    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
157
1
        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
158
1
    }
159
202
#endif
160
202
    return Status::OK();
161
202
}
162
163
83
Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
164
83
    _is_fetching = true;
165
83
    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
166
83
        auto&& res = _fetch_ids_from_fe(length);
167
83
        if (!res.has_value()) [[unlikely]] {
168
83
            auto&& err = res.error();
169
83
            LOG_WARNING(
170
83
                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
171
83
                    "values from fe, db_id={}, table_id={}, column_id={}, status={}",
172
83
                    _db_id, _table_id, _column_id, err);
173
83
            _is_fetching = false;
174
83
            return;
175
83
        }
176
83
        int64_t start = res.value();
177
83
        LOG_INFO(
178
83
                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
179
83
                "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
180
83
                _db_id, _table_id, _column_id, start, length);
181
83
        {
182
83
            std::lock_guard<std::mutex> lock {_latch};
183
83
            _buffers.emplace_back(start, length);
184
83
            _current_volume += length;
185
83
        }
186
83
        _is_fetching = false;
187
83
    }));
188
83
    return Status::OK();
189
83
}
190
191
} // namespace doris