Coverage Report

Created: 2026-05-19 12:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <list>
20
21
#include "common/cast_set.h"
22
#include "common/config.h"
23
#include "common/factory_creator.h"
24
#include "common/status.h"
25
#include "util/threadpool.h"
26
27
namespace doris {
28
29
class VOlapTableSink;
30
class OlapTableBlockConvertor;
31
32
struct AutoIncIDAllocator {
33
0
    int64_t next_id() {
34
0
        DCHECK(!ids.empty());
35
0
        if (ids.front().second > 0) {
36
0
            --ids.front().second;
37
0
            --total_count;
38
0
            return ids.front().first++;
39
0
        }
40
0
        ids.pop_front();
41
0
        DCHECK(!ids.empty() && ids.front().second > 0);
42
0
        --ids.front().second;
43
0
        --total_count;
44
0
        return ids.front().first++;
45
0
    }
46
47
0
    void insert_ids(int64_t start, size_t length) {
48
0
        total_count += length;
49
0
        ids.emplace_back(start, length);
50
0
    }
51
52
    size_t total_count {0};
53
    std::list<std::pair<int64_t, size_t>> ids;
54
};
55
56
class AutoIncIDBuffer {
57
    ENABLE_FACTORY_CREATOR(AutoIncIDBuffer);
58
    // GenericReader::_MIN_BATCH_SIZE = 4064
59
    static constexpr size_t MIN_BATCH_SIZE = 4064;
60
61
public:
62
    // all public functions are thread safe
63
    AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
64
    void set_batch_size_at_least(size_t batch_size);
65
    Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result);
66
67
    struct AutoIncRange {
68
        int64_t start;
69
        size_t length;
70
71
5
        bool empty() const { return length == 0; }
72
73
5
        void consume(size_t l) {
74
5
            start += l;
75
5
            length -= l;
76
5
        }
77
    };
78
79
#ifdef BE_TEST
80
5
    void append_range_for_test(int64_t start, size_t length) {
81
5
        std::lock_guard<std::mutex> lock {_latch};
82
5
        _buffers.emplace_back(AutoIncRange {start, length});
83
5
        _current_volume += length;
84
5
    }
85
#endif
86
87
private:
88
0
    [[nodiscard]] size_t _prefetch_size() const {
89
0
        return _batch_size * config::auto_inc_prefetch_size_ratio;
90
0
    }
91
92
0
    [[nodiscard]] size_t _low_water_level_mark() const {
93
0
        return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
94
0
    };
95
96
    void _get_autoinc_ranges_from_buffers(size_t& request_length,
97
                                          std::vector<std::pair<int64_t, size_t>>* result);
98
99
    Status _launch_async_fetch_task(size_t length);
100
101
    Result<int64_t> _fetch_ids_from_fe(size_t length);
102
103
    std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
104
105
    int64_t _db_id;
106
    int64_t _table_id;
107
    int64_t _column_id;
108
109
    std::unique_ptr<ThreadPoolToken> _rpc_token;
110
    Status _rpc_status {Status::OK()};
111
112
    std::atomic<bool> _is_fetching {false};
113
114
    std::mutex _mutex;
115
116
    mutable std::mutex _latch;
117
    std::list<AutoIncRange> _buffers;
118
    size_t _current_volume {0};
119
};
120
121
class GlobalAutoIncBuffers {
122
public:
123
5
    static GlobalAutoIncBuffers* GetInstance() {
124
5
        static GlobalAutoIncBuffers buffers;
125
5
        return &buffers;
126
5
    }
127
128
1
    GlobalAutoIncBuffers() {
129
1
        static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor")
130
1
                                  .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
131
1
                                  .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
132
1
                                  .set_max_queue_size(std::numeric_limits<int>::max())
133
1
                                  .build(&_fetch_autoinc_id_executor));
134
1
    }
135
1
    ~GlobalAutoIncBuffers() = default;
136
137
5
    std::unique_ptr<ThreadPoolToken> create_token() {
138
5
        return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT);
139
5
    }
140
141
    std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id,
142
0
                                                         int64_t column_id) {
143
0
        std::lock_guard<std::mutex> lock(_mutex);
144
0
        auto key = std::make_tuple(db_id, table_id, column_id);
145
0
        auto it = _buffers.find(key);
146
0
        if (it == _buffers.end()) {
147
0
            _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id));
148
0
        }
149
0
        return _buffers[{db_id, table_id, column_id}];
150
0
    }
151
152
private:
153
    std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor;
154
    std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers;
155
    std::mutex _mutex;
156
};
157
158
} // namespace doris