Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/query_cache/query_cache.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <glog/logging.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <atomic>
26
#include <memory>
27
#include <roaring/roaring.hh>
28
#include <string>
29
30
#include "common/config.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/path.h"
35
#include "runtime/exec_env.h"
36
#include "runtime/memory/lru_cache_policy.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "util/lru_cache.h"
39
#include "util/slice.h"
40
#include "util/time.h"
41
42
namespace doris {
43
44
using CacheResult = std::vector<BlockUPtr>;
45
// A handle for mid-result from query lru cache.
46
// The handle will automatically release the cache entry when it is destroyed.
47
// So the caller need to make sure the handle is valid in lifecycle.
48
class QueryCacheHandle {
49
public:
50
5
    QueryCacheHandle() = default;
51
    QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
52
5
            : _cache(cache), _handle(handle) {}
53
54
11
    ~QueryCacheHandle() {
55
11
        if (_handle != nullptr) {
56
5
            CHECK(_cache != nullptr);
57
5
            {
58
5
                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
59
5
                        ExecEnv::GetInstance()->query_cache_mem_tracker());
60
5
                _cache->release(_handle);
61
5
            }
62
5
        }
63
11
    }
64
65
1
    QueryCacheHandle(QueryCacheHandle&& other) noexcept {
66
1
        std::swap(_cache, other._cache);
67
1
        std::swap(_handle, other._handle);
68
1
    }
69
70
3
    QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept {
71
3
        std::swap(_cache, other._cache);
72
3
        std::swap(_handle, other._handle);
73
3
        return *this;
74
3
    }
75
76
    std::vector<int>* get_cache_slot_orders();
77
78
    CacheResult* get_cache_result();
79
80
    int64_t get_cache_version();
81
82
private:
83
    LRUCachePolicy* _cache = nullptr;
84
    Cache::Handle* _handle = nullptr;
85
86
    // Don't allow copy and assign
87
    DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle);
88
};
89
90
class QueryCache : public LRUCachePolicy {
91
public:
92
    using LRUCachePolicy::insert;
93
94
    struct CacheValue : public LRUCacheValueBase {
95
        int64_t version;
96
        CacheResult result;
97
        std::vector<int> slot_orders;
98
99
        CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so)
100
3
                : LRUCacheValueBase(), version(v), result(std::move(r)), slot_orders(so) {}
101
    };
102
103
    // Create global instance of this class
104
5
    static QueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
105
5
        auto* res = new QueryCache(capacity, num_shards);
106
5
        return res;
107
5
    }
108
109
    static Status build_cache_key(const std::vector<TScanRangeParams>& scan_ranges,
110
                                  const TQueryCacheParam& cache_param, std::string* cache_key,
111
12
                                  int64_t* version) {
112
12
        if (scan_ranges.empty()) {
113
1
            return Status::InternalError("scan_ranges is empty, plan error");
114
1
        }
115
116
11
        std::string digest;
117
11
        try {
118
11
            digest = cache_param.digest;
119
11
        } catch (const std::exception&) {
120
0
            return Status::InternalError("digest is invalid, plan error");
121
0
        }
122
11
        if (digest.empty()) {
123
0
            return Status::InternalError("digest is empty, plan error");
124
0
        }
125
126
11
        if (cache_param.tablet_to_range.empty()) {
127
1
            return Status::InternalError("tablet_to_range is empty, plan error");
128
1
        }
129
130
10
        std::vector<int64_t> tablet_ids;
131
10
        tablet_ids.reserve(scan_ranges.size());
132
16
        for (const auto& scan_range : scan_ranges) {
133
16
            auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
134
16
            tablet_ids.push_back(tablet_id);
135
16
        }
136
10
        std::sort(tablet_ids.begin(), tablet_ids.end());
137
138
10
        int64_t first_version = -1;
139
10
        std::string first_tablet_range;
140
22
        for (size_t i = 0; i < tablet_ids.size(); ++i) {
141
16
            auto tablet_id = tablet_ids[i];
142
143
16
            auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
144
16
            if (find_tablet == cache_param.tablet_to_range.end()) {
145
1
                return Status::InternalError("Not find tablet in partition_to_tablets, plan error");
146
1
            }
147
148
15
            auto scan_range_iter =
149
15
                    std::find_if(scan_ranges.begin(), scan_ranges.end(),
150
21
                                 [&tablet_id](const TScanRangeParams& range) {
151
21
                                     return range.scan_range.palo_scan_range.tablet_id == tablet_id;
152
21
                                 });
153
15
            int64_t current_version = -1;
154
15
            std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
155
15
                            scan_range_iter->scan_range.palo_scan_range.version.data() +
156
15
                                    scan_range_iter->scan_range.palo_scan_range.version.size(),
157
15
                            current_version);
158
159
15
            if (i == 0) {
160
10
                first_version = current_version;
161
10
                first_tablet_range = find_tablet->second;
162
10
            } else {
163
5
                if (current_version != first_version) {
164
1
                    return Status::InternalError(
165
1
                            "All tablets in one instance must have the same version, plan error");
166
1
                }
167
4
                if (find_tablet->second != first_tablet_range) {
168
2
                    return Status::InternalError(
169
2
                            "All tablets in one instance must have the same tablet_to_range, plan "
170
2
                            "error");
171
2
                }
172
4
            }
173
15
        }
174
175
6
        *version = first_version;
176
177
6
        *cache_key = digest;
178
8
        for (auto tablet_id : tablet_ids) {
179
8
            *cache_key += std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id));
180
8
        }
181
6
        *cache_key += first_tablet_range;
182
183
6
        return Status::OK();
184
10
    }
185
186
    // Return global instance.
187
    // Client should call create_global_cache before.
188
3
    static QueryCache* instance() { return ExecEnv::GetInstance()->get_query_cache(); }
189
190
    QueryCache() = delete;
191
192
    QueryCache(size_t capacity, uint32_t num_shards)
193
5
            : LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity, LRUCacheType::SIZE,
194
5
                             3600 * 24, /*num_shards*/ num_shards,
195
5
                             /*element_count_capacity*/ 0, /*enable_prune*/ true,
196
5
                             /*is_lru_k*/ true) {}
197
198
    bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* handle);
199
200
    void insert(const CacheKey& key, int64_t version, CacheResult& result,
201
                const std::vector<int>& solt_orders, int64_t cache_size);
202
};
203
} // namespace doris