Coverage Report

Created: 2026-03-12 14:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cache/result_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "runtime/cache/result_cache.h"
18
19
#include <gen_cpp/internal_service.pb.h>
20
#include <glog/logging.h>
21
22
#include <iostream>
23
#include <list>
24
#include <utility>
25
26
#include "common/metrics/doris_metrics.h"
27
#include "runtime/cache/cache_utils.h"
28
#include "storage/olap_define.h"
29
30
namespace doris {
31
32
/**
33
* Remove the tail node of link
34
*/
35
0
ResultNode* ResultNodeList::pop() {
36
0
    remove(_head);
37
0
    return _head;
38
0
}
39
40
0
void ResultNodeList::remove(ResultNode* node) {
41
0
    if (!node) return;
42
0
    if (node == _head) _head = node->get_next();
43
0
    if (node == _tail) _tail = node->get_prev();
44
0
    node->unlink();
45
0
    _node_count--;
46
0
}
47
48
132
void ResultNodeList::push_back(ResultNode* node) {
49
132
    if (!node) return;
50
132
    if (!_head) _head = node;
51
132
    node->append(_tail);
52
132
    _tail = node;
53
132
    _node_count++;
54
132
}
55
56
247
void ResultNodeList::move_tail(ResultNode* node) {
57
247
    if (!node || node == _tail) return;
58
56
    if (!_head)
59
0
        _head = node;
60
56
    else if (node == _head)
61
0
        _head = node->get_next();
62
56
    node->unlink();
63
56
    node->append(_tail);
64
56
    _tail = node;
65
56
}
66
67
0
void ResultNodeList::delete_node(ResultNode** node) {
68
0
    (*node)->clear();
69
0
    SAFE_DELETE(*node);
70
0
}
71
72
30
void ResultNodeList::clear() {
73
30
    LOG(INFO) << "clear result node list.";
74
107
    while (_head) {
75
77
        ResultNode* tmp_node = _head->get_next();
76
77
        _head->clear();
77
77
        SAFE_DELETE(_head);
78
77
        _head = tmp_node;
79
77
    }
80
30
    _node_count = 0;
81
30
}
82
/**
83
 * Find the node and update partition data
84
 * New node, the node updated in the first partition will move to the tail of the list
85
 */
86
134
void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* response) {
87
134
    ResultNode* node;
88
134
    PCacheStatus status;
89
134
    bool update_first = false;
90
134
    UniqueId sql_key = request->sql_key();
91
134
    LOG(INFO) << "update cache, sql key:" << sql_key;
92
93
134
    CacheWriteLock write_lock(_cache_mtx);
94
134
    auto it = _node_map.find(sql_key);
95
134
    if (it != _node_map.end()) {
96
2
        node = it->second;
97
2
        _cache_size -= node->get_data_size();
98
2
        _partition_count -= node->get_partition_count();
99
2
        status = node->update_partition(request, update_first);
100
132
    } else {
101
132
        node = _node_list.new_node(sql_key);
102
132
        status = node->update_partition(request, update_first);
103
132
        _node_list.push_back(node);
104
132
        _node_map[sql_key] = node;
105
132
        _node_count += 1;
106
132
    }
107
134
    if (update_first) {
108
132
        _node_list.move_tail(node);
109
132
    }
110
134
    _cache_size += node->get_data_size();
111
134
    _partition_count += node->get_partition_count();
112
134
    response->set_status(status);
113
114
134
    prune();
115
134
    update_monitor();
116
134
}
117
118
/**
119
 * Fetch cache through sql key, partition key, version and time
120
 */
121
122
void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* result) {
122
122
    bool hit_first = false;
123
122
    ResultNodeMap::iterator node_it;
124
122
    const UniqueId sql_key = request->sql_key();
125
122
    LOG(INFO) << "fetch cache, sql key:" << sql_key;
126
122
    {
127
122
        CacheReadLock read_lock(_cache_mtx);
128
122
        node_it = _node_map.find(sql_key);
129
122
        if (node_it == _node_map.end()) {
130
1
            result->set_status(PCacheStatus::NO_SQL_KEY);
131
1
            LOG(INFO) << "no such sql key:" << sql_key;
132
1
            return;
133
1
        }
134
121
        ResultNode* node = node_it->second;
135
121
        PartitionRowBatchList part_rowbatch_list;
136
121
        PCacheStatus status = node->fetch_partition(request, part_rowbatch_list, hit_first);
137
138
239
        for (auto part_it = part_rowbatch_list.begin(); part_it != part_rowbatch_list.end();
139
121
             part_it++) {
140
118
            PCacheValue* srcValue = (*part_it)->get_value();
141
118
            if (srcValue != nullptr) {
142
118
                PCacheValue* value = result->add_values();
143
118
                value->CopyFrom(*srcValue);
144
118
                LOG(INFO) << "fetch cache partition key:" << srcValue->param().partition_key();
145
118
            } else {
146
0
                LOG(WARNING) << "prowbatch of cache is null";
147
0
                status = PCacheStatus::EMPTY_DATA;
148
0
                break;
149
0
            }
150
118
        }
151
121
        if (status == PCacheStatus::CACHE_OK && part_rowbatch_list.empty()) {
152
0
            status = PCacheStatus::EMPTY_DATA;
153
0
        }
154
121
        result->set_status(status);
155
121
    }
156
157
121
    if (hit_first) {
158
115
        CacheWriteLock write_lock(_cache_mtx);
159
115
        _node_list.move_tail(node_it->second);
160
115
    }
161
121
}
162
163
0
bool ResultCache::contains(const UniqueId& sql_key) {
164
0
    CacheReadLock read_lock(_cache_mtx);
165
0
    return _node_map.find(sql_key) != _node_map.end();
166
0
}
167
168
/**
169
 * enum PClearType {
170
 *   CLEAR_ALL = 0,
171
 *   PRUNE_CACHE = 1,
172
 *   CLEAR_BEFORE_TIME = 2,
173
 *   CLEAR_SQL_KEY = 3
174
 * };
175
 */
176
14
void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* response) {
177
14
    LOG(INFO) << "clear cache type" << request->clear_type()
178
14
              << ", node size:" << _node_list.get_node_count() << ", map size:" << _node_map.size();
179
14
    CacheWriteLock write_lock(_cache_mtx);
180
    //0 clear, 1 prune, 2 before_time,3 sql_key
181
14
    switch (request->clear_type()) {
182
14
    case PClearType::CLEAR_ALL:
183
14
        _node_list.clear();
184
14
        _node_map.clear();
185
14
        _cache_size = 0;
186
14
        _node_count = 0;
187
14
        _partition_count = 0;
188
14
        break;
189
0
    case PClearType::PRUNE_CACHE:
190
0
        prune();
191
0
        break;
192
0
    default:
193
0
        break;
194
14
    }
195
14
    update_monitor();
196
14
    response->set_status(PCacheStatus::CACHE_OK);
197
14
}
198
199
//private method
200
0
ResultNode* find_min_time_node(ResultNode* result_node) {
201
0
    if (result_node->get_prev()) {
202
0
        if (result_node->get_prev()->first_partition_last_time() <=
203
0
            result_node->first_partition_last_time()) {
204
0
            return result_node->get_prev();
205
0
        }
206
0
    }
207
208
0
    if (result_node->get_next()) {
209
0
        if (result_node->get_next()->first_partition_last_time() <
210
0
            result_node->first_partition_last_time()) {
211
0
            return result_node->get_next();
212
0
        }
213
0
    }
214
0
    return result_node;
215
0
}
216
217
/*
218
* Two-dimensional array, prune the min last_read_time PartitionRowBatch.
219
* The following example is the last read time array.
220
* 1 and 2 is the read time, nodes with pruning read time < 3
221
* Before:
222
*   1,2         //_head ResultNode*
223
*   1,2,3,4,5   
224
*   2,4,3,6,8   
225
*   5,7,9,11,13 //_tail ResultNode*
226
* After:
227
*   4,5         //_head
228
*   4,3,6,8
229
*   5,7,9,11,13 //_tail
230
*/
231
134
void ResultCache::prune() {
232
134
    if (_cache_size <= (_max_size + _elasticity_size)) {
233
134
        return;
234
134
    }
235
134
    LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", max_size : " << _max_size
236
0
              << ", elasticity_size : " << _elasticity_size;
237
0
    ResultNode* result_node = _node_list.get_head();
238
0
    while (_cache_size > _max_size) {
239
0
        if (result_node == nullptr) {
240
0
            break;
241
0
        }
242
0
        result_node = find_min_time_node(result_node);
243
0
        _cache_size -= result_node->prune_first();
244
0
        if (result_node->get_data_size() == 0) {
245
0
            ResultNode* next_node;
246
0
            if (result_node->get_next()) {
247
0
                next_node = result_node->get_next();
248
0
            } else if (result_node->get_prev()) {
249
0
                next_node = result_node->get_prev();
250
0
            } else {
251
0
                next_node = _node_list.get_head();
252
0
            }
253
0
            remove(result_node);
254
0
            result_node = next_node;
255
0
        }
256
0
    }
257
0
    LOG(INFO) << "finish prune, cache_size : " << _cache_size;
258
0
    _node_count = _node_map.size();
259
0
    _cache_size = 0;
260
0
    _partition_count = 0;
261
0
    for (auto node_it = _node_map.begin(); node_it != _node_map.end(); node_it++) {
262
0
        _partition_count += node_it->second->get_partition_count();
263
0
        _cache_size += node_it->second->get_data_size();
264
0
    }
265
0
}
266
267
0
void ResultCache::remove(ResultNode* result_node) {
268
0
    auto node_it = _node_map.find(result_node->get_sql_key());
269
0
    if (node_it != _node_map.end()) {
270
0
        _node_map.erase(node_it);
271
0
        _node_list.remove(result_node);
272
0
        _node_list.delete_node(&result_node);
273
0
    }
274
0
}
275
276
148
void ResultCache::update_monitor() {
277
148
    DorisMetrics::instance()->query_cache_memory_total_byte->set_value(_cache_size);
278
148
    DorisMetrics::instance()->query_cache_sql_total_count->set_value(_node_count);
279
148
    DorisMetrics::instance()->query_cache_partition_total_count->set_value(_partition_count);
280
148
}
281
282
} // namespace doris