Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/autoinc_buffer.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
22
#include <chrono>
23
#include <mutex>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/runtime_profile.h"
29
#include "util/client_cache.h"
30
#include "util/debug_points.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
AutoIncIDBuffer::AutoIncIDBuffer(int64_t db_id, int64_t table_id, int64_t column_id)
37
0
        : _db_id(db_id),
38
0
          _table_id(table_id),
39
0
          _column_id(column_id),
40
0
          _rpc_token(GlobalAutoIncBuffers::GetInstance()->create_token()) {}
41
42
0
void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
43
0
    if (batch_size > _batch_size) {
44
0
        _batch_size = batch_size;
45
0
    }
46
0
}
47
48
0
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
49
0
    LOG_INFO(
50
0
            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
51
0
            "db_id={}, table_id={}, column_id={}, length={}",
52
0
            _db_id, _table_id, _column_id, length);
53
0
    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
54
0
    _rpc_status = Status::OK();
55
0
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
56
0
    for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
57
0
        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
58
0
            _rpc_status = Status::InternalError<false>("injected error");
59
0
            break;
60
0
        });
61
0
        TAutoIncrementRangeRequest request;
62
0
        TAutoIncrementRangeResult result;
63
0
        request.__set_db_id(_db_id);
64
0
        request.__set_table_id(_table_id);
65
0
        request.__set_column_id(_column_id);
66
0
        request.__set_length(length);
67
68
0
        int64_t get_auto_inc_range_rpc_ns = 0;
69
0
        {
70
0
            SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
71
0
            _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
72
0
                    master_addr.hostname, master_addr.port,
73
0
                    [&request, &result](FrontendServiceConnection& client) {
74
0
                        client->getAutoIncrementRange(result, request);
75
0
                    });
76
0
        }
77
78
0
        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
79
0
            LOG_WARNING(
80
0
                    "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
81
0
                    "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
82
0
                    "column_id={}",
83
0
                    master_addr.hostname, master_addr.port, result.master_address.hostname,
84
0
                    result.master_address.port, retry_times, _db_id, _table_id, _column_id);
85
0
            master_addr = result.master_address;
86
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
87
0
            continue;
88
0
        }
89
90
0
        if (!_rpc_status.ok()) {
91
0
            LOG_WARNING(
92
0
                    "Failed to fetch auto-increment range, encounter rpc failure. "
93
0
                    "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
94
0
                    _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
95
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
96
0
            continue;
97
0
        }
98
0
        if (result.length != length) [[unlikely]] {
99
0
            auto msg = fmt::format(
100
0
                    "Failed to fetch auto-increment range, request length={}, but get "
101
0
                    "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
102
0
                    length, result.length, retry_times, _db_id, _table_id, _column_id);
103
0
            LOG(WARNING) << msg;
104
0
            _rpc_status = Status::RpcError<true>(msg);
105
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
106
0
            continue;
107
0
        }
108
109
0
        LOG_INFO(
110
0
                "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
111
0
                "retry_time={}, db_id={}, table_id={}, column_id={}",
112
0
                master_addr.hostname, master_addr.port, result.start, result.length,
113
0
                get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
114
0
        return result.start;
115
0
    }
116
0
    CHECK(!_rpc_status.ok());
117
0
    return ResultError(_rpc_status);
118
0
}
119
120
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
121
0
        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
122
0
    std::lock_guard<std::mutex> lock {_latch};
123
0
    while (request_length > 0 && !_buffers.empty()) {
124
0
        auto& autoinc_range = _buffers.front();
125
0
        CHECK_GT(autoinc_range.length, 0);
126
0
        auto min_length = std::min(request_length, autoinc_range.length);
127
0
        result->emplace_back(autoinc_range.start, min_length);
128
0
        autoinc_range.consume(min_length);
129
0
        _current_volume -= min_length;
130
0
        request_length -= min_length;
131
0
        if (autoinc_range.empty()) {
132
0
            _buffers.pop_front();
133
0
        }
134
0
    }
135
0
}
136
137
Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
138
0
                                         std::vector<std::pair<int64_t, size_t>>* result) {
139
0
    std::lock_guard<std::mutex> lock(_mutex);
140
0
    while (request_length > 0) {
141
0
        _get_autoinc_ranges_from_buffers(request_length, result);
142
0
        if (request_length == 0) {
143
0
            break;
144
0
        }
145
0
        if (!_is_fetching) {
146
0
            RETURN_IF_ERROR(
147
0
                    _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
148
0
        }
149
0
        _rpc_token->wait();
150
0
        CHECK(!_is_fetching);
151
0
        if (!_rpc_status.ok()) {
152
0
            return _rpc_status;
153
0
        }
154
0
    }
155
0
    CHECK_EQ(request_length, 0);
156
0
    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
157
0
        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
158
0
    }
159
0
    return Status::OK();
160
0
}
161
162
0
Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
163
0
    _is_fetching = true;
164
0
    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
165
0
        auto&& res = _fetch_ids_from_fe(length);
166
0
        if (!res.has_value()) [[unlikely]] {
167
0
            auto&& err = res.error();
168
0
            LOG_WARNING(
169
0
                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
170
0
                    "values from fe, db_id={}, table_id={}, column_id={}, status={}",
171
0
                    _db_id, _table_id, _column_id, err);
172
0
            _is_fetching = false;
173
0
            return;
174
0
        }
175
0
        int64_t start = res.value();
176
0
        LOG_INFO(
177
0
                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
178
0
                "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
179
0
                _db_id, _table_id, _column_id, start, length);
180
0
        {
181
0
            std::lock_guard<std::mutex> lock {_latch};
182
0
            _buffers.emplace_back(start, length);
183
0
            _current_volume += length;
184
0
        }
185
0
        _is_fetching = false;
186
0
    }));
187
0
    return Status::OK();
188
0
}
189
190
} // namespace doris