Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/point_query_executor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <assert.h>
21
#include <butil/macros.h>
22
#include <butil/time.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <parallel_hashmap/phmap.h>
25
#include <stdint.h>
26
#include <string.h>
27
28
#include <algorithm>
29
#include <memory>
30
#include <mutex>
31
#include <optional>
32
#include <ostream>
33
#include <string>
34
#include <unordered_map>
35
#include <utility>
36
#include <vector>
37
38
#include "butil/containers/doubly_buffered_data.h"
39
#include "common/config.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/block.h"
43
#include "core/data_type_serde/data_type_serde.h"
44
#include "exprs/vexpr_fwd.h"
45
#include "runtime/descriptors.h"
46
#include "runtime/exec_env.h"
47
#include "runtime/runtime_profile.h"
48
#include "storage/olap_common.h"
49
#include "storage/rowset/rowset.h"
50
#include "storage/tablet/tablet.h"
51
#include "storage/utils.h"
52
#include "util/lru_cache.h"
53
#include "util/mysql_global.h"
54
#include "util/slice.h"
55
56
namespace doris {
57
58
class PTabletKeyLookupRequest;
59
class PTabletKeyLookupResponse;
60
class RuntimeState;
61
class TDescriptorTable;
62
class TExpr;
63
64
// For caching point lookup pre allocted blocks and exprs
65
class Reusable {
66
public:
67
    ~Reusable();
68
69
0
    bool is_expired(int64_t ttl_ms) const {
70
0
        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
71
0
    }
72
73
    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
74
                const TQueryOptions& query_options, const TabletSchema& schema,
75
                size_t block_size = 1);
76
77
    std::unique_ptr<Block> get_block();
78
79
254
    const DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; }
80
81
262
    const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const {
82
262
        return _col_uid_to_idx;
83
262
    }
84
85
254
    const std::vector<std::string>& get_col_default_values() const { return _col_default_values; }
86
87
    // do not touch block after returned
88
    void return_block(std::unique_ptr<Block>& block);
89
90
213k
    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
91
92
509
    const VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; }
93
94
447
    int32_t rs_column_uid() const { return _row_store_column_ids; }
95
96
197
    const std::unordered_set<int32_t> missing_col_uids() const { return _missing_col_uids; }
97
98
514
    const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; }
99
100
771
    RuntimeState* runtime_state() { return _runtime_state.get(); }
101
102
    // delete sign idx in block
103
367
    int32_t delete_sign_idx() const { return _delete_sign_idx; }
104
105
private:
106
    // caching TupleDescriptor, output_expr, etc...
107
    std::unique_ptr<RuntimeState> _runtime_state;
108
    DescriptorTbl* _desc_tbl = nullptr;
109
    std::mutex _block_mutex;
110
    // prevent from allocte too many tmp blocks
111
    std::vector<std::unique_ptr<Block>> _block_pool;
112
    VExprContextSPtrs _output_exprs_ctxs;
113
    int64_t _create_timestamp = 0;
114
    DataTypeSerDeSPtrs _data_type_serdes;
115
    std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
116
    std::vector<std::string> _col_default_values;
117
    // picked rowstore(column group) column unique id
118
    int32_t _row_store_column_ids = -1;
119
    // some column is missing in rowstore(column group), we need to fill them with column store values
120
    std::unordered_set<int32_t> _missing_col_uids;
121
    // included cids in rowstore(column group)
122
    std::unordered_set<int32_t> _include_col_uids;
123
    // delete sign idx in block
124
    int32_t _delete_sign_idx = -1;
125
};
126
127
// RowCache is a LRU cache for row store
128
class RowCache : public LRUCachePolicy {
129
public:
130
    using LRUCachePolicy::insert;
131
132
    // The cache key for row lru cache
133
    struct RowCacheKey {
134
305
        RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {}
135
        int64_t tablet_id;
136
        Slice key;
137
138
        // Encode to a flat binary which can be used as LRUCache's key
139
308
        std::string encode() const {
140
308
            std::string full_key;
141
308
            full_key.reserve(sizeof(int64_t) + key.size);
142
308
            const char* tid = reinterpret_cast<const char*>(&tablet_id);
143
308
            full_key.append(tid, tid + sizeof(int64_t));
144
308
            full_key.append(key.data, key.size);
145
308
            return full_key;
146
308
        }
147
    };
148
149
    class RowCacheValue : public LRUCacheValueBase {
150
    public:
151
15
        ~RowCacheValue() override { free(cache_value); }
152
        char* cache_value;
153
    };
154
155
    // A handle for RowCache entry. This class make it easy to handle
156
    // Cache entry. Users don't need to release the obtained cache entry. This
157
    // class will release the cache entry when it is destroyed.
158
    class CacheHandle {
159
    public:
160
404
        CacheHandle() = default;
161
        CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
162
143
                : _cache(cache), _handle(handle) {}
163
547
        ~CacheHandle() {
164
547
            if (_handle != nullptr) {
165
143
                _cache->release(_handle);
166
143
            }
167
547
        }
168
169
0
        CacheHandle(CacheHandle&& other) noexcept {
170
0
            std::swap(_cache, other._cache);
171
0
            std::swap(_handle, other._handle);
172
0
        }
173
174
135
        CacheHandle& operator=(CacheHandle&& other) noexcept {
175
135
            std::swap(_cache, other._cache);
176
135
            std::swap(_handle, other._handle);
177
135
            return *this;
178
135
        }
179
180
265
        bool valid() { return _cache != nullptr && _handle != nullptr; }
181
182
0
        LRUCachePolicy* cache() const { return _cache; }
183
135
        Slice data() const {
184
135
            return {(char*)((RowCacheValue*)_cache->value(_handle))->cache_value,
185
135
                    reinterpret_cast<LRUHandle*>(_handle)->charge};
186
135
        }
187
188
    private:
189
        LRUCachePolicy* _cache = nullptr;
190
        Cache::Handle* _handle = nullptr;
191
192
        // Don't allow copy and assign
193
        DISALLOW_COPY_AND_ASSIGN(CacheHandle);
194
    };
195
196
    // Create global instance of this class
197
    static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards);
198
199
    static RowCache* instance();
200
201
    // Lookup a row key from cache,
202
    // If the Row key is found, the cache entry will be written into handle.
203
    // CacheHandle will release cache entry to cache when it destructs
204
    // Return true if entry is found, otherwise return false.
205
    bool lookup(const RowCacheKey& key, CacheHandle* handle);
206
207
    // Insert a row with key into this cache.
208
    // This function is thread-safe, and when two clients insert two same key
209
    // concurrently, this function can assure that only one page is cached.
210
    // The in_memory page will have higher priority.
211
    void insert(const RowCacheKey& key, const Slice& data);
212
213
    //
214
    void erase(const RowCacheKey& key);
215
216
private:
217
    static constexpr uint32_t kDefaultNumShards = 128;
218
    RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
219
};
220
221
// A cache used for prepare stmt.
222
// One connection per stmt perf uuid
223
class LookupConnectionCache : public LRUCachePolicy {
224
public:
225
351
    static LookupConnectionCache* instance() {
226
351
        return ExecEnv::GetInstance()->get_lookup_connection_cache();
227
351
    }
228
229
    static LookupConnectionCache* create_global_instance(size_t capacity);
230
231
private:
232
    friend class PointQueryExecutor;
233
    LookupConnectionCache(size_t capacity)
234
19
            : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity,
235
19
                             LRUCacheType::NUMBER, config::tablet_lookup_cache_stale_sweep_time_sec,
236
19
                             /*num shards*/ 32, /*element count capacity */ 0,
237
19
                             /*enable prune*/ true, /*is lru-k*/ true) {}
238
239
4.49k
    static std::string encode_key(__int128_t cache_id) {
240
4.49k
        fmt::memory_buffer buffer;
241
4.49k
        fmt::format_to(buffer, "{}", cache_id);
242
4.49k
        return std::string(buffer.data(), buffer.size());
243
4.49k
    }
244
245
2.18k
    void add(__int128_t cache_id, std::shared_ptr<Reusable> item) {
246
2.18k
        std::string key = encode_key(cache_id);
247
2.18k
        auto* value = new CacheValue;
248
2.18k
        value->item = item;
249
2.18k
        VLOG_DEBUG << "Add item mem"
250
0
                   << ", cache_capacity: " << get_capacity() << ", cache_usage: " << get_usage()
251
0
                   << ", mem_consum: " << mem_consumption();
252
2.18k
        auto* lru_handle = insert(key, value, 1, sizeof(Reusable), CachePriority::NORMAL);
253
2.18k
        release(lru_handle);
254
2.18k
    }
255
256
2.31k
    std::shared_ptr<Reusable> get(__int128_t cache_id) {
257
2.31k
        std::string key = encode_key(cache_id);
258
2.31k
        auto* lru_handle = lookup(key);
259
2.31k
        if (lru_handle) {
260
155
            Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
261
155
            auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle));
262
155
            return value->item;
263
155
        }
264
2.15k
        return nullptr;
265
2.31k
    }
266
267
    class CacheValue : public LRUCacheValueBase {
268
    public:
269
        ~CacheValue() override;
270
        std::shared_ptr<Reusable> item;
271
    };
272
};
273
274
struct Metrics {
275
    Metrics()
276
263
            : init_ns(TUnit::TIME_NS),
277
263
              init_key_ns(TUnit::TIME_NS),
278
263
              lookup_key_ns(TUnit::TIME_NS),
279
263
              lookup_data_ns(TUnit::TIME_NS),
280
263
              output_data_ns(TUnit::TIME_NS),
281
263
              load_segment_key_stage_ns(TUnit::TIME_NS),
282
263
              load_segment_data_stage_ns(TUnit::TIME_NS) {}
283
    RuntimeProfile::Counter init_ns;
284
    RuntimeProfile::Counter init_key_ns;
285
    RuntimeProfile::Counter lookup_key_ns;
286
    RuntimeProfile::Counter lookup_data_ns;
287
    RuntimeProfile::Counter output_data_ns;
288
    RuntimeProfile::Counter load_segment_key_stage_ns;
289
    RuntimeProfile::Counter load_segment_data_stage_ns;
290
    OlapReaderStatistics read_stats;
291
    size_t row_cache_hits = 0;
292
    bool hit_lookup_cache = false;
293
    size_t result_data_bytes;
294
};
295
296
// An util to do tablet lookup
297
class PointQueryExecutor {
298
public:
299
    ~PointQueryExecutor();
300
301
    Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response);
302
303
    Status lookup_up();
304
305
    void print_profile();
306
307
0
    const OlapReaderStatistics& read_stats() const { return _read_stats; }
308
309
private:
310
    Status _init_keys(const PTabletKeyLookupRequest* request);
311
312
    Status _lookup_row_key();
313
314
    Status _lookup_row_data();
315
316
    Status _output_data();
317
318
187
    static void release_rowset(RowsetSharedPtr* r) {
319
187
        if (r && *r) {
320
187
            VLOG_DEBUG << "release rowset " << (*r)->rowset_id();
321
187
            (*r)->release();
322
187
        }
323
187
        delete r;
324
187
    }
325
326
    // Read context for each row
327
    struct RowReadContext {
328
263
        RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
329
        std::string _primary_key;
330
        RowCache::CacheHandle _cached_row_data;
331
        std::optional<RowLocation> _row_location;
332
        // rowset will be aquired during read
333
        // and released after used
334
        std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr;
335
    };
336
337
    PTabletKeyLookupResponse* _response = nullptr;
338
    BaseTabletSPtr _tablet;
339
    std::vector<RowReadContext> _row_read_ctxs;
340
    std::shared_ptr<Reusable> _reusable;
341
    std::unique_ptr<Block> _result_block;
342
    Metrics _profile_metrics;
343
    bool _binary_row_format = false;
344
    OlapReaderStatistics _read_stats;
345
    int32_t _row_hits = 0;
346
    // snapshot read version
347
    int64_t _version = -1;
348
};
349
350
} // namespace doris