be/src/exec/sink/autoinc_buffer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <list> |
20 | | |
21 | | #include "common/cast_set.h" |
22 | | #include "common/config.h" |
23 | | #include "common/factory_creator.h" |
24 | | #include "common/status.h" |
25 | | #include "util/threadpool.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | | |
30 | | class VOlapTableSink; |
31 | | class OlapTableBlockConvertor; |
32 | | |
33 | | struct AutoIncIDAllocator { |
34 | 26 | int64_t next_id() { |
35 | 26 | DCHECK(!ids.empty()); |
36 | 26 | if (ids.front().second > 0) { |
37 | 26 | --ids.front().second; |
38 | 26 | --total_count; |
39 | 26 | return ids.front().first++; |
40 | 26 | } |
41 | 0 | ids.pop_front(); |
42 | 0 | DCHECK(!ids.empty() && ids.front().second > 0); |
43 | 0 | --ids.front().second; |
44 | 0 | --total_count; |
45 | 0 | return ids.front().first++; |
46 | 26 | } |
47 | | |
48 | 4 | void insert_ids(int64_t start, size_t length) { |
49 | 4 | total_count += length; |
50 | 4 | ids.emplace_back(start, length); |
51 | 4 | } |
52 | | |
53 | | size_t total_count {0}; |
54 | | std::list<std::pair<int64_t, size_t>> ids; |
55 | | }; |
56 | | |
57 | | class AutoIncIDBuffer { |
58 | | ENABLE_FACTORY_CREATOR(AutoIncIDBuffer); |
59 | | // GenericReader::_MIN_BATCH_SIZE = 4064 |
60 | | static constexpr size_t MIN_BATCH_SIZE = 4064; |
61 | | |
62 | | public: |
63 | | // all public functions are thread safe |
64 | | AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id); |
65 | | void set_batch_size_at_least(size_t batch_size); |
66 | | Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result); |
67 | | |
68 | | struct AutoIncRange { |
69 | | int64_t start; |
70 | | size_t length; |
71 | | |
72 | 4 | bool empty() const { return length == 0; } |
73 | | |
74 | 4 | void consume(size_t l) { |
75 | 4 | start += l; |
76 | 4 | length -= l; |
77 | 4 | } |
78 | | }; |
79 | | |
80 | | private: |
81 | 4 | [[nodiscard]] size_t _prefetch_size() const { |
82 | 4 | return _batch_size * config::auto_inc_prefetch_size_ratio; |
83 | 4 | } |
84 | | |
85 | 4 | [[nodiscard]] size_t _low_water_level_mark() const { |
86 | 4 | return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; |
87 | 4 | }; |
88 | | |
89 | | void _get_autoinc_ranges_from_buffers(size_t& request_length, |
90 | | std::vector<std::pair<int64_t, size_t>>* result); |
91 | | |
92 | | Status _launch_async_fetch_task(size_t length); |
93 | | |
94 | | Result<int64_t> _fetch_ids_from_fe(size_t length); |
95 | | |
96 | | std::atomic<size_t> _batch_size {MIN_BATCH_SIZE}; |
97 | | |
98 | | int64_t _db_id; |
99 | | int64_t _table_id; |
100 | | int64_t _column_id; |
101 | | |
102 | | std::unique_ptr<ThreadPoolToken> _rpc_token; |
103 | | Status _rpc_status {Status::OK()}; |
104 | | |
105 | | std::atomic<bool> _is_fetching {false}; |
106 | | |
107 | | std::mutex _mutex; |
108 | | |
109 | | mutable std::mutex _latch; |
110 | | std::list<AutoIncRange> _buffers; |
111 | | size_t _current_volume {0}; |
112 | | }; |
113 | | |
114 | | class GlobalAutoIncBuffers { |
115 | | public: |
116 | 12 | static GlobalAutoIncBuffers* GetInstance() { |
117 | 12 | static GlobalAutoIncBuffers buffers; |
118 | 12 | return &buffers; |
119 | 12 | } |
120 | | |
121 | 2 | GlobalAutoIncBuffers() { |
122 | 2 | static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor") |
123 | 2 | .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
124 | 2 | .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num)) |
125 | 2 | .set_max_queue_size(std::numeric_limits<int>::max()) |
126 | 2 | .build(&_fetch_autoinc_id_executor)); |
127 | 2 | } |
128 | 1 | ~GlobalAutoIncBuffers() = default; |
129 | | |
130 | 4 | std::unique_ptr<ThreadPoolToken> create_token() { |
131 | 4 | return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT); |
132 | 4 | } |
133 | | |
134 | | std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id, |
135 | 8 | int64_t column_id) { |
136 | 8 | std::lock_guard<std::mutex> lock(_mutex); |
137 | 8 | auto key = std::make_tuple(db_id, table_id, column_id); |
138 | 8 | auto it = _buffers.find(key); |
139 | 8 | if (it == _buffers.end()) { |
140 | 4 | _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)); |
141 | 4 | } |
142 | 8 | return _buffers[{db_id, table_id, column_id}]; |
143 | 8 | } |
144 | | |
145 | | private: |
146 | | std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor; |
147 | | std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers; |
148 | | std::mutex _mutex; |
149 | | }; |
150 | | |
151 | | } // namespace doris |
152 | | #include "common/compile_check_end.h" |