be/src/runtime/query_cache/query_cache.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/macros.h> |
21 | | #include <glog/logging.h> |
22 | | #include <stddef.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <memory> |
27 | | #include <roaring/roaring.hh> |
28 | | #include <string> |
29 | | |
30 | | #include "common/config.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "io/fs/file_system.h" |
34 | | #include "io/fs/path.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/memory/lru_cache_policy.h" |
37 | | #include "runtime/memory/mem_tracker.h" |
38 | | #include "util/lru_cache.h" |
39 | | #include "util/slice.h" |
40 | | #include "util/time.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | using CacheResult = std::vector<BlockUPtr>; |
45 | | // A handle for mid-result from query lru cache. |
46 | | // The handle will automatically release the cache entry when it is destroyed. |
47 | | // So the caller need to make sure the handle is valid in lifecycle. |
48 | | class QueryCacheHandle { |
49 | | public: |
50 | 735 | QueryCacheHandle() = default; |
51 | | QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle) |
52 | 421 | : _cache(cache), _handle(handle) {} |
53 | | |
54 | 1.15k | ~QueryCacheHandle() { |
55 | 1.15k | if (_handle != nullptr) { |
56 | 420 | CHECK(_cache != nullptr); |
57 | 420 | { |
58 | 420 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
59 | 420 | ExecEnv::GetInstance()->query_cache_mem_tracker()); |
60 | 420 | _cache->release(_handle); |
61 | 420 | } |
62 | 420 | } |
63 | 1.15k | } |
64 | | |
65 | | QueryCacheHandle(QueryCacheHandle&& other) noexcept { |
66 | | std::swap(_cache, other._cache); |
67 | | std::swap(_handle, other._handle); |
68 | | } |
69 | | |
70 | 103 | QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept { |
71 | 103 | std::swap(_cache, other._cache); |
72 | 103 | std::swap(_handle, other._handle); |
73 | 103 | return *this; |
74 | 103 | } |
75 | | |
76 | | std::vector<int>* get_cache_slot_orders(); |
77 | | |
78 | | CacheResult* get_cache_result(); |
79 | | |
80 | | int64_t get_cache_version(); |
81 | | |
82 | | private: |
83 | | LRUCachePolicy* _cache = nullptr; |
84 | | Cache::Handle* _handle = nullptr; |
85 | | |
86 | | // Don't allow copy and assign |
87 | | DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle); |
88 | | }; |
89 | | |
90 | | class QueryCache : public LRUCachePolicy { |
91 | | public: |
92 | | using LRUCachePolicy::insert; |
93 | | |
94 | | struct CacheValue : public LRUCacheValueBase { |
95 | | int64_t version; |
96 | | CacheResult result; |
97 | | std::vector<int> slot_orders; |
98 | | |
99 | | CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so) |
100 | 319 | : LRUCacheValueBase(), version(v), result(std::move(r)), slot_orders(so) {} |
101 | | }; |
102 | | |
103 | | // Create global instance of this class |
104 | 12 | static QueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) { |
105 | 12 | auto* res = new QueryCache(capacity, num_shards); |
106 | 12 | return res; |
107 | 12 | } |
108 | | |
109 | | static Status build_cache_key(const std::vector<TScanRangeParams>& scan_ranges, |
110 | | const TQueryCacheParam& cache_param, std::string* cache_key, |
111 | 740 | int64_t* version) { |
112 | 740 | if (scan_ranges.empty()) { |
113 | 1 | return Status::InternalError("scan_ranges is empty, plan error"); |
114 | 1 | } |
115 | | |
116 | 739 | std::string digest; |
117 | 739 | try { |
118 | 739 | digest = cache_param.digest; |
119 | 739 | } catch (const std::exception&) { |
120 | 0 | return Status::InternalError("digest is invalid, plan error"); |
121 | 0 | } |
122 | 741 | if (digest.empty()) { |
123 | 0 | return Status::InternalError("digest is empty, plan error"); |
124 | 0 | } |
125 | | |
126 | 741 | if (cache_param.tablet_to_range.empty()) { |
127 | 1 | return Status::InternalError("tablet_to_range is empty, plan error"); |
128 | 1 | } |
129 | | |
130 | 740 | std::vector<int64_t> tablet_ids; |
131 | 740 | tablet_ids.reserve(scan_ranges.size()); |
132 | 746 | for (const auto& scan_range : scan_ranges) { |
133 | 746 | auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id; |
134 | 746 | tablet_ids.push_back(tablet_id); |
135 | 746 | } |
136 | 740 | std::sort(tablet_ids.begin(), tablet_ids.end()); |
137 | | |
138 | 740 | int64_t first_version = -1; |
139 | 740 | std::string first_tablet_range; |
140 | 1.48k | for (size_t i = 0; i < tablet_ids.size(); ++i) { |
141 | 748 | auto tablet_id = tablet_ids[i]; |
142 | | |
143 | 748 | auto find_tablet = cache_param.tablet_to_range.find(tablet_id); |
144 | 748 | if (find_tablet == cache_param.tablet_to_range.end()) { |
145 | 1 | return Status::InternalError("Not find tablet in partition_to_tablets, plan error"); |
146 | 1 | } |
147 | | |
148 | 747 | auto scan_range_iter = |
149 | 747 | std::find_if(scan_ranges.begin(), scan_ranges.end(), |
150 | 752 | [&tablet_id](const TScanRangeParams& range) { |
151 | 752 | return range.scan_range.palo_scan_range.tablet_id == tablet_id; |
152 | 752 | }); |
153 | 747 | int64_t current_version = -1; |
154 | 747 | std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(), |
155 | 747 | scan_range_iter->scan_range.palo_scan_range.version.data() + |
156 | 747 | scan_range_iter->scan_range.palo_scan_range.version.size(), |
157 | 747 | current_version); |
158 | | |
159 | 747 | if (i == 0) { |
160 | 742 | first_version = current_version; |
161 | 742 | first_tablet_range = find_tablet->second; |
162 | 742 | } else { |
163 | 5 | if (current_version != first_version) { |
164 | 1 | return Status::InternalError( |
165 | 1 | "All tablets in one instance must have the same version, plan error"); |
166 | 1 | } |
167 | 4 | if (find_tablet->second != first_tablet_range) { |
168 | 2 | return Status::InternalError( |
169 | 2 | "All tablets in one instance must have the same tablet_to_range, plan " |
170 | 2 | "error"); |
171 | 2 | } |
172 | 4 | } |
173 | 747 | } |
174 | | |
175 | 736 | *version = first_version; |
176 | | |
177 | 736 | *cache_key = digest; |
178 | 740 | for (auto tablet_id : tablet_ids) { |
179 | 740 | *cache_key += std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id)); |
180 | 740 | } |
181 | 736 | *cache_key += first_tablet_range; |
182 | | |
183 | 736 | return Status::OK(); |
184 | 740 | } |
185 | | |
186 | | // Return global instance. |
187 | | // Client should call create_global_cache before. |
188 | 735 | static QueryCache* instance() { return ExecEnv::GetInstance()->get_query_cache(); } |
189 | | |
190 | | QueryCache() = delete; |
191 | | |
192 | | QueryCache(size_t capacity, uint32_t num_shards) |
193 | 12 | : LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity, LRUCacheType::SIZE, |
194 | 12 | 3600 * 24, /*num_shards*/ num_shards, |
195 | 12 | /*element_count_capacity*/ 0, /*enable_prune*/ true, |
196 | 12 | /*is_lru_k*/ true) {} |
197 | | |
198 | | bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* handle); |
199 | | |
200 | | void insert(const CacheKey& key, int64_t version, CacheResult& result, |
201 | | const std::vector<int>& solt_orders, int64_t cache_size); |
202 | | }; |
203 | | } // namespace doris |