Coverage Report

Created: 2026-03-13 14:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/autoinc_buffer.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
22
#include <chrono>
23
#include <mutex>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/runtime_profile.h"
29
#include "util/client_cache.h"
30
#include "util/debug_points.h"
31
#include "util/thrift_rpc_helper.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
AutoIncIDBuffer::AutoIncIDBuffer(int64_t db_id, int64_t table_id, int64_t column_id)
37
109
        : _db_id(db_id),
38
109
          _table_id(table_id),
39
109
          _column_id(column_id),
40
109
          _rpc_token(GlobalAutoIncBuffers::GetInstance()->create_token()) {}
41
42
248
void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
43
248
    if (batch_size > _batch_size) {
44
70
        _batch_size = batch_size;
45
70
    }
46
248
}
47
48
85
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
49
85
    LOG_INFO(
50
85
            "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
51
85
            "db_id={}, table_id={}, column_id={}, length={}",
52
85
            _db_id, _table_id, _column_id, length);
53
85
    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
54
85
    _rpc_status = Status::OK();
55
85
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
56
85
    for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
57
85
        DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
58
85
            _rpc_status = Status::InternalError<false>("injected error");
59
85
            break;
60
85
        });
61
85
        TAutoIncrementRangeRequest request;
62
85
        TAutoIncrementRangeResult result;
63
85
        request.__set_db_id(_db_id);
64
85
        request.__set_table_id(_table_id);
65
85
        request.__set_column_id(_column_id);
66
85
        request.__set_length(length);
67
68
85
        int64_t get_auto_inc_range_rpc_ns = 0;
69
85
        {
70
85
            SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
71
85
            _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
72
85
                    master_addr.hostname, master_addr.port,
73
85
                    [&request, &result](FrontendServiceConnection& client) {
74
85
                        client->getAutoIncrementRange(result, request);
75
85
                    });
76
85
        }
77
78
85
        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
79
0
            LOG_WARNING(
80
0
                    "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
81
0
                    "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
82
0
                    "column_id={}",
83
0
                    master_addr.hostname, master_addr.port, result.master_address.hostname,
84
0
                    result.master_address.port, retry_times, _db_id, _table_id, _column_id);
85
0
            master_addr = result.master_address;
86
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
87
0
            continue;
88
0
        }
89
90
85
        if (!_rpc_status.ok()) {
91
0
            LOG_WARNING(
92
0
                    "Failed to fetch auto-increment range, encounter rpc failure. "
93
0
                    "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
94
0
                    _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
95
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
96
0
            continue;
97
0
        }
98
85
        if (result.length != length) [[unlikely]] {
99
0
            auto msg = fmt::format(
100
0
                    "Failed to fetch auto-increment range, request length={}, but get "
101
0
                    "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
102
0
                    length, result.length, retry_times, _db_id, _table_id, _column_id);
103
0
            LOG(WARNING) << msg;
104
0
            _rpc_status = Status::RpcError<true>(msg);
105
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
106
0
            continue;
107
0
        }
108
109
85
        LOG_INFO(
110
85
                "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
111
85
                "retry_time={}, db_id={}, table_id={}, column_id={}",
112
85
                master_addr.hostname, master_addr.port, result.start, result.length,
113
85
                get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
114
85
        return result.start;
115
85
    }
116
85
    CHECK(!_rpc_status.ok());
117
0
    return ResultError(_rpc_status);
118
85
}
119
120
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
121
281
        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
122
281
    std::lock_guard<std::mutex> lock {_latch};
123
478
    while (request_length > 0 && !_buffers.empty()) {
124
197
        auto& autoinc_range = _buffers.front();
125
197
        CHECK_GT(autoinc_range.length, 0);
126
197
        auto min_length = std::min(request_length, autoinc_range.length);
127
197
        result->emplace_back(autoinc_range.start, min_length);
128
197
        autoinc_range.consume(min_length);
129
197
        _current_volume -= min_length;
130
197
        request_length -= min_length;
131
197
        if (autoinc_range.empty()) {
132
0
            _buffers.pop_front();
133
0
        }
134
197
    }
135
281
}
136
137
Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
138
203
                                         std::vector<std::pair<int64_t, size_t>>* result) {
139
203
    std::lock_guard<std::mutex> lock(_mutex);
140
287
    while (request_length > 0) {
141
281
        _get_autoinc_ranges_from_buffers(request_length, result);
142
281
        if (request_length == 0) {
143
197
            break;
144
197
        }
145
84
        if (!_is_fetching) {
146
84
            RETURN_IF_ERROR(
147
84
                    _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
148
84
        }
149
84
        _rpc_token->wait();
150
84
        CHECK(!_is_fetching);
151
84
        if (!_rpc_status.ok()) {
152
0
            return _rpc_status;
153
0
        }
154
84
    }
155
203
    CHECK_EQ(request_length, 0);
156
203
    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
157
1
        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
158
1
    }
159
203
    return Status::OK();
160
203
}
161
162
85
Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
163
85
    _is_fetching = true;
164
85
    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
165
85
        auto&& res = _fetch_ids_from_fe(length);
166
85
        if (!res.has_value()) [[unlikely]] {
167
85
            auto&& err = res.error();
168
85
            LOG_WARNING(
169
85
                    "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
170
85
                    "values from fe, db_id={}, table_id={}, column_id={}, status={}",
171
85
                    _db_id, _table_id, _column_id, err);
172
85
            _is_fetching = false;
173
85
            return;
174
85
        }
175
85
        int64_t start = res.value();
176
85
        LOG_INFO(
177
85
                "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
178
85
                "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
179
85
                _db_id, _table_id, _column_id, start, length);
180
85
        {
181
85
            std::lock_guard<std::mutex> lock {_latch};
182
85
            _buffers.emplace_back(start, length);
183
85
            _current_volume += length;
184
85
        }
185
85
        _is_fetching = false;
186
85
    }));
187
85
    return Status::OK();
188
85
}
189
190
} // namespace doris