Coverage Report

Created: 2026-03-14 13:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cache/result_node.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/internal_service.pb.h>
21
22
#include <cstdio>
23
#include <list>
24
#include <shared_mutex>
25
#include <unordered_map>
26
27
#include "runtime/cache/cache_utils.h"
28
#include "util/uid_util.h"
29
30
namespace doris {
31
32
/**
33
* Cache one partition data, request param must match version and time of cache
34
*/
35
class PartitionRowBatch {
36
public:
37
    PartitionRowBatch(int64_t partition_key)
38
82.8k
            : _partition_key(partition_key), _cache_value(nullptr), _data_size(0) {}
39
40
10.5k
    ~PartitionRowBatch() {}
41
42
    void set_row_batch(const PCacheValue& value);
43
    bool is_hit_cache(const PCacheParam& param);
44
    void clear();
45
46
121k
    int64_t get_partition_key() const { return _partition_key; }
47
48
3.89k
    PCacheValue* get_value() { return _cache_value; }
49
50
83.1k
    size_t get_data_size() { return _data_size; }
51
52
0
    const CacheStat* get_stat() const { return &_cache_stat; }
53
54
private:
55
3.90k
    bool check_match(const PCacheParam& req_param) {
56
3.90k
        if (req_param.partition_key() != _partition_key) {
57
0
            return false;
58
0
        }
59
3.90k
        if (req_param.last_version() > _cache_value->param().last_version()) {
60
1
            return false;
61
1
        }
62
3.90k
        if (req_param.last_version_time() > _cache_value->param().last_version_time()) {
63
0
            return false;
64
0
        }
65
3.90k
        if (req_param.partition_num() != _cache_value->param().partition_num()) {
66
0
            return false;
67
0
        }
68
3.90k
        return true;
69
3.90k
    }
70
71
0
    bool check_newer(const PCacheParam& up_param) {
72
        //for init data of sql cache
73
0
        if (up_param.last_version() == 0 || up_param.last_version_time() == 0) {
74
0
            return true;
75
0
        }
76
0
        if (up_param.last_version_time() > _cache_value->param().last_version_time()) {
77
0
            return true;
78
0
        }
79
0
        if (up_param.last_version_time() == _cache_value->param().last_version_time() &&
80
0
            up_param.partition_num() != _cache_value->param().partition_num()) {
81
0
            return true;
82
0
        }
83
0
        if (up_param.last_version() > _cache_value->param().last_version()) {
84
0
            return true;
85
0
        }
86
0
        if (up_param.last_version() == _cache_value->param().last_version() &&
87
0
            up_param.partition_num() != _cache_value->param().partition_num()) {
88
0
            return true;
89
0
        }
90
0
        return false;
91
0
    }
92
93
private:
94
    int64_t _partition_key;
95
    PCacheValue* _cache_value = nullptr;
96
    size_t _data_size;
97
    CacheStat _cache_stat;
98
};
99
100
typedef int64_t PartitionKey;
101
typedef std::list<PartitionRowBatch*> PartitionRowBatchList;
102
typedef std::unordered_map<PartitionKey, PartitionRowBatch*> PartitionRowBatchMap;
103
104
/**
105
* Cache the result of one SQL, include many partition rowsets.
106
* Sql Cache: The partition ID comes from the partition last updated.
107
* Partition Cache: The partition ID comes from the partition scanned by query.
108
* The above two modes use the same cache structure.
109
*/
110
class ResultNode {
111
public:
112
0
    ResultNode() : _sql_key(0, 0), _prev(nullptr), _next(nullptr), _data_size(0) {}
113
114
    ResultNode(const UniqueId& sql_key)
115
72.4k
            : _sql_key(sql_key), _prev(nullptr), _next(nullptr), _data_size(0) {}
116
117
22
    virtual ~ResultNode() {}
118
119
    PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& is_update_firstkey);
120
    PCacheStatus fetch_partition(const PFetchCacheRequest* request,
121
                                 PartitionRowBatchList& rowBatchList, bool& is_hit_firstkey);
122
    PCacheStatus update_sql_cache(const PUpdateCacheRequest* request, bool& is_update_firstkey);
123
    PCacheStatus update_partition_cache(const PUpdateCacheRequest* request,
124
                                        bool& is_update_firstkey);
125
126
    size_t prune_first();
127
    void clear();
128
129
0
    ResultNode* get_prev() { return _prev; }
130
131
2.46k
    void set_prev(ResultNode* prev) { _prev = prev; }
132
133
22
    ResultNode* get_next() { return _next; }
134
135
77.3k
    void set_next(ResultNode* next) { _next = next; }
136
137
    void append(ResultNode* tail);
138
139
    void unlink();
140
141
72.9k
    size_t get_partition_count() const { return _partition_list.size(); }
142
143
72.9k
    size_t get_data_size() const { return _data_size; }
144
145
0
    UniqueId get_sql_key() { return _sql_key; }
146
147
0
    bool sql_key_null() { return _sql_key.hi == 0 && _sql_key.lo == 0; }
148
149
0
    void set_sql_key(const UniqueId& sql_key) { _sql_key = sql_key; }
150
151
0
    long first_partition_last_time() const {
152
0
        if (_partition_list.size() == 0) {
153
0
            return 0;
154
0
        }
155
0
        const PartitionRowBatch* first = *(_partition_list.begin());
156
0
        return first->get_stat()->last_read_time;
157
0
    }
158
159
0
    const CacheStat* get_first_stat() const {
160
0
        if (_partition_list.size() == 0) {
161
0
            return nullptr;
162
0
        }
163
0
        return (*(_partition_list.begin()))->get_stat();
164
0
    }
165
166
0
    const CacheStat* get_last_stat() const {
167
0
        if (_partition_list.size() == 0) {
168
0
            return nullptr;
169
0
        }
170
0
        return (*(_partition_list.end()--))->get_stat();
171
0
    }
172
173
private:
174
    mutable std::shared_mutex _node_mtx;
175
    UniqueId _sql_key;
176
    ResultNode* _prev = nullptr;
177
    ResultNode* _next = nullptr;
178
    size_t _data_size;
179
    PartitionRowBatchList _partition_list;
180
    PartitionRowBatchMap _partition_map;
181
};
182
183
} // namespace doris