be/src/exec/sink/autoinc_buffer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <list> |
20 | | |
21 | | #include "common/cast_set.h" |
22 | | #include "common/config.h" |
23 | | #include "common/factory_creator.h" |
24 | | #include "common/status.h" |
25 | | #include "util/threadpool.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | class VOlapTableSink; |
30 | | class OlapTableBlockConvertor; |
31 | | |
32 | | struct AutoIncIDAllocator { |
33 | 0 | int64_t next_id() { |
34 | 0 | DCHECK(!ids.empty()); |
35 | 0 | if (ids.front().second > 0) { |
36 | 0 | --ids.front().second; |
37 | 0 | --total_count; |
38 | 0 | return ids.front().first++; |
39 | 0 | } |
40 | 0 | ids.pop_front(); |
41 | 0 | DCHECK(!ids.empty() && ids.front().second > 0); |
42 | 0 | --ids.front().second; |
43 | 0 | --total_count; |
44 | 0 | return ids.front().first++; |
45 | 0 | } |
46 | | |
47 | 0 | void insert_ids(int64_t start, size_t length) { |
48 | 0 | total_count += length; |
49 | 0 | ids.emplace_back(start, length); |
50 | 0 | } |
51 | | |
52 | | size_t total_count {0}; |
53 | | std::list<std::pair<int64_t, size_t>> ids; |
54 | | }; |
55 | | |
56 | | class AutoIncIDBuffer { |
57 | | ENABLE_FACTORY_CREATOR(AutoIncIDBuffer); |
58 | | // GenericReader::_MIN_BATCH_SIZE = 4064 |
59 | | static constexpr size_t MIN_BATCH_SIZE = 4064; |
60 | | |
61 | | public: |
62 | | // all public functions are thread safe |
63 | | AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id); |
64 | | void set_batch_size_at_least(size_t batch_size); |
65 | | Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result); |
66 | | |
67 | | struct AutoIncRange { |
68 | | int64_t start; |
69 | | size_t length; |
70 | | |
71 | 5 | bool empty() const { return length == 0; } |
72 | | |
73 | 5 | void consume(size_t l) { |
74 | 5 | start += l; |
75 | 5 | length -= l; |
76 | 5 | } |
77 | | }; |
78 | | |
79 | | #ifdef BE_TEST |
80 | 5 | void append_range_for_test(int64_t start, size_t length) { |
81 | 5 | std::lock_guard<std::mutex> lock {_latch}; |
82 | 5 | _buffers.emplace_back(AutoIncRange {start, length}); |
83 | 5 | _current_volume += length; |
84 | 5 | } |
85 | | #endif |
86 | | |
87 | | private: |
88 | 0 | [[nodiscard]] size_t _prefetch_size() const { |
89 | 0 | return _batch_size * config::auto_inc_prefetch_size_ratio; |
90 | 0 | } |
91 | | |
92 | 0 | [[nodiscard]] size_t _low_water_level_mark() const { |
93 | 0 | return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; |
94 | 0 | }; |
95 | | |
96 | | void _get_autoinc_ranges_from_buffers(size_t& request_length, |
97 | | std::vector<std::pair<int64_t, size_t>>* result); |
98 | | |
99 | | Status _launch_async_fetch_task(size_t length); |
100 | | |
101 | | Result<int64_t> _fetch_ids_from_fe(size_t length); |
102 | | |
103 | | std::atomic<size_t> _batch_size {MIN_BATCH_SIZE}; |
104 | | |
105 | | int64_t _db_id; |
106 | | int64_t _table_id; |
107 | | int64_t _column_id; |
108 | | |
109 | | std::unique_ptr<ThreadPoolToken> _rpc_token; |
110 | | Status _rpc_status {Status::OK()}; |
111 | | |
112 | | std::atomic<bool> _is_fetching {false}; |
113 | | |
114 | | std::mutex _mutex; |
115 | | |
116 | | mutable std::mutex _latch; |
117 | | std::list<AutoIncRange> _buffers; |
118 | | size_t _current_volume {0}; |
119 | | }; |
120 | | |
121 | | class GlobalAutoIncBuffers { |
122 | | public: |
123 | 5 | static GlobalAutoIncBuffers* GetInstance() { |
124 | 5 | static GlobalAutoIncBuffers buffers; |
125 | 5 | return &buffers; |
126 | 5 | } |
127 | | |
128 | 1 | GlobalAutoIncBuffers() { |
129 | 1 | static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor") |
130 | 1 | .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
131 | 1 | .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
132 | 1 | .set_max_queue_size(std::numeric_limits<int>::max()) |
133 | 1 | .build(&_fetch_autoinc_id_executor)); |
134 | 1 | } |
135 | 1 | ~GlobalAutoIncBuffers() = default; |
136 | | |
137 | 5 | std::unique_ptr<ThreadPoolToken> create_token() { |
138 | 5 | return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT); |
139 | 5 | } |
140 | | |
141 | | std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id, |
142 | 0 | int64_t column_id) { |
143 | 0 | std::lock_guard<std::mutex> lock(_mutex); |
144 | 0 | auto key = std::make_tuple(db_id, table_id, column_id); |
145 | 0 | auto it = _buffers.find(key); |
146 | 0 | if (it == _buffers.end()) { |
147 | 0 | _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)); |
148 | 0 | } |
149 | 0 | return _buffers[{db_id, table_id, column_id}]; |
150 | 0 | } |
151 | | |
152 | | private: |
153 | | std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor; |
154 | | std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers; |
155 | | std::mutex _mutex; |
156 | | }; |
157 | | |
158 | | } // namespace doris |