Coverage Report

Created: 2026-07-03 16:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/point_query_executor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <assert.h>
21
#include <butil/macros.h>
22
#include <butil/time.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <parallel_hashmap/phmap.h>
25
#include <stdint.h>
26
#include <string.h>
27
28
#include <algorithm>
29
#include <memory>
30
#include <mutex>
31
#include <optional>
32
#include <ostream>
33
#include <string>
34
#include <unordered_map>
35
#include <utility>
36
#include <vector>
37
38
#include "butil/containers/doubly_buffered_data.h"
39
#include "common/config.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/block.h"
43
#include "core/data_type_serde/data_type_serde.h"
44
#include "exprs/vexpr_fwd.h"
45
#include "runtime/descriptors.h"
46
#include "runtime/exec_env.h"
47
#include "runtime/runtime_profile.h"
48
#include "storage/olap_common.h"
49
#include "storage/rowset/rowset.h"
50
#include "storage/tablet/tablet.h"
51
#include "storage/utils.h"
52
#include "util/lru_cache.h"
53
#include "util/mysql_global.h"
54
#include "util/slice.h"
55
56
namespace doris {
57
58
class PTabletKeyLookupRequest;
59
class PTabletKeyLookupResponse;
60
class RuntimeState;
61
class TDescriptorTable;
62
class TExpr;
63
64
// For caching point lookup pre allocted blocks and exprs
65
class Reusable {
66
public:
67
    ~Reusable();
68
69
0
    bool is_expired(int64_t ttl_ms) const {
70
0
        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
71
0
    }
72
73
    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
74
                const TQueryOptions& query_options, const TabletSchema& schema,
75
                size_t block_size = 1);
76
77
    std::unique_ptr<Block> get_block();
78
79
219
    const DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; }
80
81
227
    const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const {
82
227
        return _col_uid_to_idx;
83
227
    }
84
85
219
    const std::vector<std::string>& get_col_default_values() const { return _col_default_values; }
86
87
    // do not touch block after returned
88
    void return_block(std::unique_ptr<Block>& block);
89
90
215k
    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
91
92
432
    const VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; }
93
94
441
    int32_t rs_column_uid() const { return _row_store_column_ids; }
95
96
229
    const std::unordered_set<int32_t> missing_col_uids() const { return _missing_col_uids; }
97
98
440
    const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; }
99
100
438
    RuntimeState* runtime_state() { return _runtime_state.get(); }
101
102
    void update_runtime_state(const PTabletKeyLookupRequest& request);
103
104
    // delete sign idx in block
105
430
    int32_t delete_sign_idx() const { return _delete_sign_idx; }
106
107
private:
108
    // caching TupleDescriptor, output_expr, etc...
109
    std::unique_ptr<RuntimeState> _runtime_state;
110
    DescriptorTbl* _desc_tbl = nullptr;
111
    std::mutex _block_mutex;
112
    // prevent from allocte too many tmp blocks
113
    std::vector<std::unique_ptr<Block>> _block_pool;
114
    VExprContextSPtrs _output_exprs_ctxs;
115
    int64_t _create_timestamp = 0;
116
    DataTypeSerDeSPtrs _data_type_serdes;
117
    std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
118
    std::vector<std::string> _col_default_values;
119
    // picked rowstore(column group) column unique id
120
    int32_t _row_store_column_ids = -1;
121
    // some column is missing in rowstore(column group), we need to fill them with column store values
122
    std::unordered_set<int32_t> _missing_col_uids;
123
    // included cids in rowstore(column group)
124
    std::unordered_set<int32_t> _include_col_uids;
125
    // delete sign idx in block
126
    int32_t _delete_sign_idx = -1;
127
};
128
129
// RowCache is a LRU cache for row store
130
class RowCache : public LRUCachePolicy {
131
public:
132
    using LRUCachePolicy::insert;
133
134
    // The cache key for row lru cache
135
    struct RowCacheKey {
136
13
        RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {}
137
        int64_t tablet_id;
138
        Slice key;
139
140
        // Encode to a flat binary which can be used as LRUCache's key
141
16
        std::string encode() const {
142
16
            std::string full_key;
143
16
            full_key.reserve(sizeof(int64_t) + key.size);
144
16
            const char* tid = reinterpret_cast<const char*>(&tablet_id);
145
16
            full_key.append(tid, tid + sizeof(int64_t));
146
16
            full_key.append(key.data, key.size);
147
16
            return full_key;
148
16
        }
149
    };
150
151
    class RowCacheValue : public LRUCacheValueBase {
152
    public:
153
11
        ~RowCacheValue() override { free(cache_value); }
154
        char* cache_value;
155
    };
156
157
    // A handle for RowCache entry. This class make it easy to handle
158
    // Cache entry. Users don't need to release the obtained cache entry. This
159
    // class will release the cache entry when it is destroyed.
160
    class CacheHandle {
161
    public:
162
229
        CacheHandle() = default;
163
        CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
164
12
                : _cache(cache), _handle(handle) {}
165
241
        ~CacheHandle() {
166
241
            if (_handle != nullptr) {
167
12
                _cache->release(_handle);
168
12
            }
169
241
        }
170
171
0
        CacheHandle(CacheHandle&& other) noexcept {
172
0
            std::swap(_cache, other._cache);
173
0
            std::swap(_handle, other._handle);
174
0
        }
175
176
1
        CacheHandle& operator=(CacheHandle&& other) noexcept {
177
1
            std::swap(_cache, other._cache);
178
1
            std::swap(_handle, other._handle);
179
1
            return *this;
180
1
        }
181
182
227
        bool valid() { return _cache != nullptr && _handle != nullptr; }
183
184
0
        LRUCachePolicy* cache() const { return _cache; }
185
1
        Slice data() const {
186
1
            return {((RowCacheValue*)_cache->value(_handle))->cache_value,
187
1
                    reinterpret_cast<LRUHandle*>(_handle)->charge};
188
1
        }
189
190
    private:
191
        LRUCachePolicy* _cache = nullptr;
192
        Cache::Handle* _handle = nullptr;
193
194
        // Don't allow copy and assign
195
        DISALLOW_COPY_AND_ASSIGN(CacheHandle);
196
    };
197
198
    // Create global instance of this class
199
    static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards);
200
201
    static RowCache* instance();
202
203
    // Lookup a row key from cache,
204
    // If the Row key is found, the cache entry will be written into handle.
205
    // CacheHandle will release cache entry to cache when it destructs
206
    // Return true if entry is found, otherwise return false.
207
    bool lookup(const RowCacheKey& key, CacheHandle* handle);
208
209
    // Insert a row with key into this cache.
210
    // This function is thread-safe, and when two clients insert two same key
211
    // concurrently, this function can assure that only one page is cached.
212
    // The in_memory page will have higher priority.
213
    void insert(const RowCacheKey& key, const Slice& data);
214
215
    //
216
    void erase(const RowCacheKey& key);
217
218
private:
219
    static constexpr uint32_t kDefaultNumShards = 128;
220
    RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
221
};
222
223
// A cache used for prepare stmt.
224
// One connection per stmt perf uuid
225
class LookupConnectionCache : public LRUCachePolicy {
226
public:
227
302
    static LookupConnectionCache* instance() {
228
302
        return ExecEnv::GetInstance()->get_lookup_connection_cache();
229
302
    }
230
231
    static LookupConnectionCache* create_global_instance(size_t capacity);
232
233
private:
234
    friend class PointQueryExecutor;
235
    LookupConnectionCache(size_t capacity)
236
19
            : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity,
237
19
                             LRUCacheType::NUMBER, config::tablet_lookup_cache_stale_sweep_time_sec,
238
19
                             /*num shards*/ 32, /*element count capacity */ 0,
239
19
                             /*enable prune*/ true, /*is lru-k*/ true) {}
240
241
4.44k
    static std::string encode_key(__int128_t cache_id) {
242
4.44k
        fmt::memory_buffer buffer;
243
4.44k
        fmt::format_to(buffer, "{}", cache_id);
244
4.44k
        return std::string(buffer.data(), buffer.size());
245
4.44k
    }
246
247
2.17k
    void add(__int128_t cache_id, std::shared_ptr<Reusable> item) {
248
2.17k
        std::string key = encode_key(cache_id);
249
2.17k
        auto* value = new CacheValue;
250
2.17k
        value->item = item;
251
2.17k
        VLOG_DEBUG << "Add item mem"
252
0
                   << ", cache_capacity: " << get_capacity() << ", cache_usage: " << get_usage()
253
0
                   << ", mem_consum: " << mem_consumption();
254
2.17k
        auto* lru_handle = insert(key, value, 1, sizeof(Reusable), CachePriority::NORMAL);
255
2.17k
        release(lru_handle);
256
2.17k
    }
257
258
2.27k
    std::shared_ptr<Reusable> get(__int128_t cache_id) {
259
2.27k
        std::string key = encode_key(cache_id);
260
2.27k
        auto* lru_handle = lookup(key);
261
2.27k
        if (lru_handle) {
262
139
            Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
263
139
            auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle));
264
139
            return value->item;
265
139
        }
266
2.13k
        return nullptr;
267
2.27k
    }
268
269
    class CacheValue : public LRUCacheValueBase {
270
    public:
271
        ~CacheValue() override;
272
        std::shared_ptr<Reusable> item;
273
    };
274
};
275
276
struct Metrics {
277
    Metrics()
278
227
            : init_ns(TUnit::TIME_NS),
279
227
              init_key_ns(TUnit::TIME_NS),
280
227
              lookup_key_ns(TUnit::TIME_NS),
281
227
              lookup_data_ns(TUnit::TIME_NS),
282
227
              output_data_ns(TUnit::TIME_NS),
283
227
              load_segment_key_stage_ns(TUnit::TIME_NS),
284
227
              load_segment_data_stage_ns(TUnit::TIME_NS) {}
285
    RuntimeProfile::Counter init_ns;
286
    RuntimeProfile::Counter init_key_ns;
287
    RuntimeProfile::Counter lookup_key_ns;
288
    RuntimeProfile::Counter lookup_data_ns;
289
    RuntimeProfile::Counter output_data_ns;
290
    RuntimeProfile::Counter load_segment_key_stage_ns;
291
    RuntimeProfile::Counter load_segment_data_stage_ns;
292
    OlapReaderStatistics read_stats;
293
    size_t row_cache_hits = 0;
294
    bool hit_lookup_cache = false;
295
    size_t result_data_bytes;
296
};
297
298
// An util to do tablet lookup
299
class PointQueryExecutor {
300
public:
301
    ~PointQueryExecutor();
302
303
    Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response);
304
305
    Status lookup_up();
306
307
    void print_profile();
308
309
0
    const OlapReaderStatistics& read_stats() const { return _read_stats; }
310
311
private:
312
    Status _init_keys(const PTabletKeyLookupRequest* request);
313
314
    Status _lookup_row_key();
315
316
    Status _lookup_row_data();
317
318
    Status _output_data();
319
320
219
    static void release_rowset(RowsetSharedPtr* r) {
321
219
        if (r && *r) {
322
219
            VLOG_DEBUG << "release rowset " << (*r)->rowset_id();
323
219
            (*r)->release();
324
219
        }
325
219
        delete r;
326
219
    }
327
328
    // Read context for each row
329
    struct RowReadContext {
330
225
        RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
331
        std::string _primary_key;
332
        RowCache::CacheHandle _cached_row_data;
333
        std::optional<RowLocation> _row_location;
334
        // rowset will be aquired during read
335
        // and released after used
336
        std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr;
337
    };
338
339
    PTabletKeyLookupResponse* _response = nullptr;
340
    BaseTabletSPtr _tablet;
341
    std::vector<RowReadContext> _row_read_ctxs;
342
    std::shared_ptr<Reusable> _reusable;
343
    std::unique_ptr<Block> _result_block;
344
    Metrics _profile_metrics;
345
    bool _binary_row_format = false;
346
    OlapReaderStatistics _read_stats;
347
    int32_t _row_hits = 0;
348
    // snapshot read version
349
    int64_t _version = -1;
350
};
351
352
} // namespace doris