be/src/service/point_query_executor.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <assert.h> |
21 | | #include <butil/macros.h> |
22 | | #include <butil/time.h> |
23 | | #include <gen_cpp/Metrics_types.h> |
24 | | #include <parallel_hashmap/phmap.h> |
25 | | #include <stdint.h> |
26 | | #include <string.h> |
27 | | |
28 | | #include <algorithm> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <optional> |
32 | | #include <ostream> |
33 | | #include <string> |
34 | | #include <unordered_map> |
35 | | #include <utility> |
36 | | #include <vector> |
37 | | |
38 | | #include "butil/containers/doubly_buffered_data.h" |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "core/data_type_serde/data_type_serde.h" |
44 | | #include "exprs/vexpr_fwd.h" |
45 | | #include "runtime/descriptors.h" |
46 | | #include "runtime/exec_env.h" |
47 | | #include "runtime/runtime_profile.h" |
48 | | #include "storage/olap_common.h" |
49 | | #include "storage/rowset/rowset.h" |
50 | | #include "storage/tablet/tablet.h" |
51 | | #include "storage/utils.h" |
52 | | #include "util/lru_cache.h" |
53 | | #include "util/mysql_global.h" |
54 | | #include "util/slice.h" |
55 | | |
56 | | namespace doris { |
57 | | |
58 | | class PTabletKeyLookupRequest; |
59 | | class PTabletKeyLookupResponse; |
60 | | class RuntimeState; |
61 | | class TDescriptorTable; |
62 | | class TExpr; |
63 | | |
64 | | // For caching point lookup pre allocted blocks and exprs |
65 | | class Reusable { |
66 | | public: |
67 | | ~Reusable(); |
68 | | |
69 | 0 | bool is_expired(int64_t ttl_ms) const { |
70 | 0 | return butil::gettimeofday_ms() - _create_timestamp > ttl_ms; |
71 | 0 | } |
72 | | |
73 | | Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, |
74 | | const TQueryOptions& query_options, const TabletSchema& schema, |
75 | | size_t block_size = 1); |
76 | | |
77 | | std::unique_ptr<Block> get_block(); |
78 | | |
79 | 254 | const DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; } |
80 | | |
81 | 262 | const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const { |
82 | 262 | return _col_uid_to_idx; |
83 | 262 | } |
84 | | |
85 | 254 | const std::vector<std::string>& get_col_default_values() const { return _col_default_values; } |
86 | | |
87 | | // do not touch block after returned |
88 | | void return_block(std::unique_ptr<Block>& block); |
89 | | |
90 | 213k | TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); } |
91 | | |
92 | 509 | const VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; } |
93 | | |
94 | 447 | int32_t rs_column_uid() const { return _row_store_column_ids; } |
95 | | |
96 | 197 | const std::unordered_set<int32_t> missing_col_uids() const { return _missing_col_uids; } |
97 | | |
98 | 514 | const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; } |
99 | | |
100 | 771 | RuntimeState* runtime_state() { return _runtime_state.get(); } |
101 | | |
102 | | // delete sign idx in block |
103 | 367 | int32_t delete_sign_idx() const { return _delete_sign_idx; } |
104 | | |
105 | | private: |
106 | | // caching TupleDescriptor, output_expr, etc... |
107 | | std::unique_ptr<RuntimeState> _runtime_state; |
108 | | DescriptorTbl* _desc_tbl = nullptr; |
109 | | std::mutex _block_mutex; |
110 | | // prevent from allocte too many tmp blocks |
111 | | std::vector<std::unique_ptr<Block>> _block_pool; |
112 | | VExprContextSPtrs _output_exprs_ctxs; |
113 | | int64_t _create_timestamp = 0; |
114 | | DataTypeSerDeSPtrs _data_type_serdes; |
115 | | std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx; |
116 | | std::vector<std::string> _col_default_values; |
117 | | // picked rowstore(column group) column unique id |
118 | | int32_t _row_store_column_ids = -1; |
119 | | // some column is missing in rowstore(column group), we need to fill them with column store values |
120 | | std::unordered_set<int32_t> _missing_col_uids; |
121 | | // included cids in rowstore(column group) |
122 | | std::unordered_set<int32_t> _include_col_uids; |
123 | | // delete sign idx in block |
124 | | int32_t _delete_sign_idx = -1; |
125 | | }; |
126 | | |
127 | | // RowCache is a LRU cache for row store |
128 | | class RowCache : public LRUCachePolicy { |
129 | | public: |
130 | | using LRUCachePolicy::insert; |
131 | | |
132 | | // The cache key for row lru cache |
133 | | struct RowCacheKey { |
134 | 305 | RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {} |
135 | | int64_t tablet_id; |
136 | | Slice key; |
137 | | |
138 | | // Encode to a flat binary which can be used as LRUCache's key |
139 | 308 | std::string encode() const { |
140 | 308 | std::string full_key; |
141 | 308 | full_key.reserve(sizeof(int64_t) + key.size); |
142 | 308 | const char* tid = reinterpret_cast<const char*>(&tablet_id); |
143 | 308 | full_key.append(tid, tid + sizeof(int64_t)); |
144 | 308 | full_key.append(key.data, key.size); |
145 | 308 | return full_key; |
146 | 308 | } |
147 | | }; |
148 | | |
149 | | class RowCacheValue : public LRUCacheValueBase { |
150 | | public: |
151 | 15 | ~RowCacheValue() override { free(cache_value); } |
152 | | char* cache_value; |
153 | | }; |
154 | | |
155 | | // A handle for RowCache entry. This class make it easy to handle |
156 | | // Cache entry. Users don't need to release the obtained cache entry. This |
157 | | // class will release the cache entry when it is destroyed. |
158 | | class CacheHandle { |
159 | | public: |
160 | 404 | CacheHandle() = default; |
161 | | CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) |
162 | 143 | : _cache(cache), _handle(handle) {} |
163 | 547 | ~CacheHandle() { |
164 | 547 | if (_handle != nullptr) { |
165 | 143 | _cache->release(_handle); |
166 | 143 | } |
167 | 547 | } |
168 | | |
169 | 0 | CacheHandle(CacheHandle&& other) noexcept { |
170 | 0 | std::swap(_cache, other._cache); |
171 | 0 | std::swap(_handle, other._handle); |
172 | 0 | } |
173 | | |
174 | 135 | CacheHandle& operator=(CacheHandle&& other) noexcept { |
175 | 135 | std::swap(_cache, other._cache); |
176 | 135 | std::swap(_handle, other._handle); |
177 | 135 | return *this; |
178 | 135 | } |
179 | | |
180 | 265 | bool valid() { return _cache != nullptr && _handle != nullptr; } |
181 | | |
182 | 0 | LRUCachePolicy* cache() const { return _cache; } |
183 | 135 | Slice data() const { |
184 | 135 | return {(char*)((RowCacheValue*)_cache->value(_handle))->cache_value, |
185 | 135 | reinterpret_cast<LRUHandle*>(_handle)->charge}; |
186 | 135 | } |
187 | | |
188 | | private: |
189 | | LRUCachePolicy* _cache = nullptr; |
190 | | Cache::Handle* _handle = nullptr; |
191 | | |
192 | | // Don't allow copy and assign |
193 | | DISALLOW_COPY_AND_ASSIGN(CacheHandle); |
194 | | }; |
195 | | |
196 | | // Create global instance of this class |
197 | | static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); |
198 | | |
199 | | static RowCache* instance(); |
200 | | |
201 | | // Lookup a row key from cache, |
202 | | // If the Row key is found, the cache entry will be written into handle. |
203 | | // CacheHandle will release cache entry to cache when it destructs |
204 | | // Return true if entry is found, otherwise return false. |
205 | | bool lookup(const RowCacheKey& key, CacheHandle* handle); |
206 | | |
207 | | // Insert a row with key into this cache. |
208 | | // This function is thread-safe, and when two clients insert two same key |
209 | | // concurrently, this function can assure that only one page is cached. |
210 | | // The in_memory page will have higher priority. |
211 | | void insert(const RowCacheKey& key, const Slice& data); |
212 | | |
213 | | // |
214 | | void erase(const RowCacheKey& key); |
215 | | |
216 | | private: |
217 | | static constexpr uint32_t kDefaultNumShards = 128; |
218 | | RowCache(int64_t capacity, int num_shards = kDefaultNumShards); |
219 | | }; |
220 | | |
221 | | // A cache used for prepare stmt. |
222 | | // One connection per stmt perf uuid |
223 | | class LookupConnectionCache : public LRUCachePolicy { |
224 | | public: |
225 | 351 | static LookupConnectionCache* instance() { |
226 | 351 | return ExecEnv::GetInstance()->get_lookup_connection_cache(); |
227 | 351 | } |
228 | | |
229 | | static LookupConnectionCache* create_global_instance(size_t capacity); |
230 | | |
231 | | private: |
232 | | friend class PointQueryExecutor; |
233 | | LookupConnectionCache(size_t capacity) |
234 | 19 | : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity, |
235 | 19 | LRUCacheType::NUMBER, config::tablet_lookup_cache_stale_sweep_time_sec, |
236 | 19 | /*num shards*/ 32, /*element count capacity */ 0, |
237 | 19 | /*enable prune*/ true, /*is lru-k*/ true) {} |
238 | | |
239 | 4.49k | static std::string encode_key(__int128_t cache_id) { |
240 | 4.49k | fmt::memory_buffer buffer; |
241 | 4.49k | fmt::format_to(buffer, "{}", cache_id); |
242 | 4.49k | return std::string(buffer.data(), buffer.size()); |
243 | 4.49k | } |
244 | | |
245 | 2.18k | void add(__int128_t cache_id, std::shared_ptr<Reusable> item) { |
246 | 2.18k | std::string key = encode_key(cache_id); |
247 | 2.18k | auto* value = new CacheValue; |
248 | 2.18k | value->item = item; |
249 | 2.18k | VLOG_DEBUG << "Add item mem" |
250 | 0 | << ", cache_capacity: " << get_capacity() << ", cache_usage: " << get_usage() |
251 | 0 | << ", mem_consum: " << mem_consumption(); |
252 | 2.18k | auto* lru_handle = insert(key, value, 1, sizeof(Reusable), CachePriority::NORMAL); |
253 | 2.18k | release(lru_handle); |
254 | 2.18k | } |
255 | | |
256 | 2.31k | std::shared_ptr<Reusable> get(__int128_t cache_id) { |
257 | 2.31k | std::string key = encode_key(cache_id); |
258 | 2.31k | auto* lru_handle = lookup(key); |
259 | 2.31k | if (lru_handle) { |
260 | 155 | Defer release([cache = this, lru_handle] { cache->release(lru_handle); }); |
261 | 155 | auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle)); |
262 | 155 | return value->item; |
263 | 155 | } |
264 | 2.15k | return nullptr; |
265 | 2.31k | } |
266 | | |
267 | | class CacheValue : public LRUCacheValueBase { |
268 | | public: |
269 | | ~CacheValue() override; |
270 | | std::shared_ptr<Reusable> item; |
271 | | }; |
272 | | }; |
273 | | |
274 | | struct Metrics { |
275 | | Metrics() |
276 | 263 | : init_ns(TUnit::TIME_NS), |
277 | 263 | init_key_ns(TUnit::TIME_NS), |
278 | 263 | lookup_key_ns(TUnit::TIME_NS), |
279 | 263 | lookup_data_ns(TUnit::TIME_NS), |
280 | 263 | output_data_ns(TUnit::TIME_NS), |
281 | 263 | load_segment_key_stage_ns(TUnit::TIME_NS), |
282 | 263 | load_segment_data_stage_ns(TUnit::TIME_NS) {} |
283 | | RuntimeProfile::Counter init_ns; |
284 | | RuntimeProfile::Counter init_key_ns; |
285 | | RuntimeProfile::Counter lookup_key_ns; |
286 | | RuntimeProfile::Counter lookup_data_ns; |
287 | | RuntimeProfile::Counter output_data_ns; |
288 | | RuntimeProfile::Counter load_segment_key_stage_ns; |
289 | | RuntimeProfile::Counter load_segment_data_stage_ns; |
290 | | OlapReaderStatistics read_stats; |
291 | | size_t row_cache_hits = 0; |
292 | | bool hit_lookup_cache = false; |
293 | | size_t result_data_bytes; |
294 | | }; |
295 | | |
296 | | // An util to do tablet lookup |
297 | | class PointQueryExecutor { |
298 | | public: |
299 | | ~PointQueryExecutor(); |
300 | | |
301 | | Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response); |
302 | | |
303 | | Status lookup_up(); |
304 | | |
305 | | void print_profile(); |
306 | | |
307 | 0 | const OlapReaderStatistics& read_stats() const { return _read_stats; } |
308 | | |
309 | | private: |
310 | | Status _init_keys(const PTabletKeyLookupRequest* request); |
311 | | |
312 | | Status _lookup_row_key(); |
313 | | |
314 | | Status _lookup_row_data(); |
315 | | |
316 | | Status _output_data(); |
317 | | |
318 | 187 | static void release_rowset(RowsetSharedPtr* r) { |
319 | 187 | if (r && *r) { |
320 | 187 | VLOG_DEBUG << "release rowset " << (*r)->rowset_id(); |
321 | 187 | (*r)->release(); |
322 | 187 | } |
323 | 187 | delete r; |
324 | 187 | } |
325 | | |
326 | | // Read context for each row |
327 | | struct RowReadContext { |
328 | 263 | RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {} |
329 | | std::string _primary_key; |
330 | | RowCache::CacheHandle _cached_row_data; |
331 | | std::optional<RowLocation> _row_location; |
332 | | // rowset will be aquired during read |
333 | | // and released after used |
334 | | std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr; |
335 | | }; |
336 | | |
337 | | PTabletKeyLookupResponse* _response = nullptr; |
338 | | BaseTabletSPtr _tablet; |
339 | | std::vector<RowReadContext> _row_read_ctxs; |
340 | | std::shared_ptr<Reusable> _reusable; |
341 | | std::unique_ptr<Block> _result_block; |
342 | | Metrics _profile_metrics; |
343 | | bool _binary_row_format = false; |
344 | | OlapReaderStatistics _read_stats; |
345 | | int32_t _row_hits = 0; |
346 | | // snapshot read version |
347 | | int64_t _version = -1; |
348 | | }; |
349 | | |
350 | | } // namespace doris |