be/src/exec/sink/autoinc_buffer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <list> |
20 | | |
21 | | #include "common/cast_set.h" |
22 | | #include "common/config.h" |
23 | | #include "common/factory_creator.h" |
24 | | #include "common/status.h" |
25 | | #include "util/threadpool.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | class VOlapTableSink; |
30 | | class OlapTableBlockConvertor; |
31 | | |
32 | | struct AutoIncIDAllocator { |
33 | 65.4k | int64_t next_id() { |
34 | 65.4k | DCHECK(!ids.empty()); |
35 | 65.4k | if (ids.front().second > 0) { |
36 | 65.4k | --ids.front().second; |
37 | 65.4k | --total_count; |
38 | 65.4k | return ids.front().first++; |
39 | 65.4k | } |
40 | 12 | ids.pop_front(); |
41 | 12 | DCHECK(!ids.empty() && ids.front().second > 0); |
42 | 12 | --ids.front().second; |
43 | 12 | --total_count; |
44 | 12 | return ids.front().first++; |
45 | 65.4k | } |
46 | | |
47 | 188 | void insert_ids(int64_t start, size_t length) { |
48 | 188 | total_count += length; |
49 | 188 | ids.emplace_back(start, length); |
50 | 188 | } |
51 | | |
52 | | size_t total_count {0}; |
53 | | std::list<std::pair<int64_t, size_t>> ids; |
54 | | }; |
55 | | |
56 | | class AutoIncIDBuffer { |
57 | | ENABLE_FACTORY_CREATOR(AutoIncIDBuffer); |
58 | | // GenericReader::_MIN_BATCH_SIZE = 4064 |
59 | | static constexpr size_t MIN_BATCH_SIZE = 4064; |
60 | | |
61 | | public: |
62 | | // all public functions are thread safe |
63 | | AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id); |
64 | | void set_batch_size_at_least(size_t batch_size); |
65 | | Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result); |
66 | | |
67 | | struct AutoIncRange { |
68 | | int64_t start; |
69 | | size_t length; |
70 | | |
71 | 188 | bool empty() const { return length == 0; } |
72 | | |
73 | 188 | void consume(size_t l) { |
74 | 188 | start += l; |
75 | 188 | length -= l; |
76 | 188 | } |
77 | | }; |
78 | | |
79 | | private: |
80 | 85 | [[nodiscard]] size_t _prefetch_size() const { |
81 | 85 | return _batch_size * config::auto_inc_prefetch_size_ratio; |
82 | 85 | } |
83 | | |
84 | 193 | [[nodiscard]] size_t _low_water_level_mark() const { |
85 | 193 | return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; |
86 | 193 | }; |
87 | | |
88 | | void _get_autoinc_ranges_from_buffers(size_t& request_length, |
89 | | std::vector<std::pair<int64_t, size_t>>* result); |
90 | | |
91 | | Status _launch_async_fetch_task(size_t length); |
92 | | |
93 | | Result<int64_t> _fetch_ids_from_fe(size_t length); |
94 | | |
95 | | std::atomic<size_t> _batch_size {MIN_BATCH_SIZE}; |
96 | | |
97 | | int64_t _db_id; |
98 | | int64_t _table_id; |
99 | | int64_t _column_id; |
100 | | |
101 | | std::unique_ptr<ThreadPoolToken> _rpc_token; |
102 | | Status _rpc_status {Status::OK()}; |
103 | | |
104 | | std::atomic<bool> _is_fetching {false}; |
105 | | |
106 | | std::mutex _mutex; |
107 | | |
108 | | mutable std::mutex _latch; |
109 | | std::list<AutoIncRange> _buffers; |
110 | | size_t _current_volume {0}; |
111 | | }; |
112 | | |
113 | | class GlobalAutoIncBuffers { |
114 | | public: |
115 | 462 | static GlobalAutoIncBuffers* GetInstance() { |
116 | 462 | static GlobalAutoIncBuffers buffers; |
117 | 462 | return &buffers; |
118 | 462 | } |
119 | | |
120 | 3 | GlobalAutoIncBuffers() { |
121 | 3 | static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor") |
122 | 3 | .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
123 | 3 | .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
124 | 3 | .set_max_queue_size(std::numeric_limits<int>::max()) |
125 | 3 | .build(&_fetch_autoinc_id_executor)); |
126 | 3 | } |
127 | 1 | ~GlobalAutoIncBuffers() = default; |
128 | | |
129 | 108 | std::unique_ptr<ThreadPoolToken> create_token() { |
130 | 108 | return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT); |
131 | 108 | } |
132 | | |
133 | | std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id, |
134 | 355 | int64_t column_id) { |
135 | 355 | std::lock_guard<std::mutex> lock(_mutex); |
136 | 355 | auto key = std::make_tuple(db_id, table_id, column_id); |
137 | 355 | auto it = _buffers.find(key); |
138 | 355 | if (it == _buffers.end()) { |
139 | 108 | _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)); |
140 | 108 | } |
141 | 355 | return _buffers[{db_id, table_id, column_id}]; |
142 | 355 | } |
143 | | |
144 | | private: |
145 | | std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor; |
146 | | std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers; |
147 | | std::mutex _mutex; |
148 | | }; |
149 | | |
150 | | } // namespace doris |