Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/cache/result_node.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "runtime/cache/result_node.h"
18
19
#include <gen_cpp/internal_service.pb.h>
20
#include <gen_cpp/types.pb.h>
21
#include <glog/logging.h>
22
23
#include <iostream>
24
#include <limits>
25
#include <utility>
26
27
#include "common/config.h"
28
#include "runtime/cache/cache_utils.h"
29
#include "storage/olap_define.h"
30
31
namespace doris {
32
33
51.2k
bool compare_partition(const PartitionRowBatch* left_node, const PartitionRowBatch* right_node) {
34
51.2k
    return left_node->get_partition_key() < right_node->get_partition_key();
35
51.2k
}
36
37
//return new batch size,only include the size of PRowBatch
38
82.3k
void PartitionRowBatch::set_row_batch(const PCacheValue& value) {
39
82.3k
    if (_cache_value != nullptr && !check_newer(value.param())) {
40
0
        LOG(WARNING) << "set old version data, cache ver:" << _cache_value->param().last_version()
41
0
                     << ",cache time:" << _cache_value->param().last_version_time()
42
0
                     << ", setdata ver:" << value.param().last_version()
43
0
                     << ",setdata time:" << value.param().last_version_time();
44
0
        return;
45
0
    }
46
82.3k
    SAFE_DELETE(_cache_value);
47
82.3k
    _cache_value = new PCacheValue(value);
48
82.3k
    _data_size += _cache_value->data_size();
49
82.3k
    _cache_stat.update();
50
82.3k
    LOG(INFO) << "finish set row batch, row num:" << _cache_value->rows_size()
51
82.3k
              << ", data size:" << _data_size;
52
82.3k
}
53
54
3.96k
bool PartitionRowBatch::is_hit_cache(const PCacheParam& param) {
55
3.96k
    if (!check_match(param)) {
56
1
        return false;
57
1
    }
58
3.96k
    _cache_stat.query();
59
3.96k
    return true;
60
3.96k
}
61
62
10.5k
void PartitionRowBatch::clear() {
63
10.5k
    LOG(INFO) << "clear partition rowbatch.";
64
10.5k
    SAFE_DELETE(_cache_value);
65
10.5k
    _partition_key = 0;
66
10.5k
    _data_size = 0;
67
10.5k
    _cache_stat.init();
68
10.5k
}
69
70
/**
71
 * Update partition cache data, find RowBatch from partition map by partition key,
72
 * the partition rowbatch are stored in the order of partition keys
73
 */
74
PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request,
75
72.0k
                                          bool& is_update_firstkey) {
76
72.0k
    is_update_firstkey = false;
77
72.0k
    if (_sql_key != request->sql_key()) {
78
0
        LOG(INFO) << "no match sql_key " << request->sql_key().hi() << request->sql_key().lo();
79
0
        return PCacheStatus::PARAM_ERROR;
80
0
    }
81
82
72.0k
    if (request->values_size() > config::query_cache_max_partition_count) {
83
1
        LOG(WARNING) << "too many partitions size:" << request->values_size();
84
1
        return PCacheStatus::PARAM_ERROR;
85
1
    }
86
87
    //Only one thread per SQL key can update the cache
88
72.0k
    CacheWriteLock write_lock(_node_mtx);
89
72.0k
    if (request->cache_type() == CacheType::SQL_CACHE) {
90
72.0k
        return update_sql_cache(request, is_update_firstkey);
91
72.0k
    } else {
92
17
        return update_partition_cache(request, is_update_firstkey);
93
17
    }
94
72.0k
}
95
96
PCacheStatus ResultNode::update_sql_cache(const PUpdateCacheRequest* request,
97
72.0k
                                          bool& is_update_firstkey) {
98
72.0k
    PartitionRowBatch* partition = nullptr;
99
72.0k
    if (request->values_size() > 1) {
100
0
        return PCacheStatus::PARAM_ERROR;
101
0
    }
102
72.0k
    is_update_firstkey = true;
103
72.0k
    const PCacheValue& value = request->values(0);
104
72.0k
    PartitionKey partition_key = value.param().partition_key();
105
    // no cache exist, create new cache node
106
72.0k
    if (_partition_map.size() == 0) {
107
71.7k
        partition = new PartitionRowBatch(partition_key);
108
71.7k
        partition->set_row_batch(value);
109
71.7k
        _partition_map[partition_key] = partition;
110
71.7k
        _partition_list.push_back(partition);
111
71.7k
    } else {
112
        // compatible with previous version
113
562
        for (auto it = _partition_list.begin(); it != _partition_list.end(); it++) {
114
281
            _data_size -= (*it)->get_data_size();
115
281
        }
116
        // clear old cache, and create new cache node
117
562
        for (auto it = _partition_list.begin(); it != _partition_list.end();) {
118
281
            (*it)->clear();
119
281
            SAFE_DELETE(*it);
120
281
            it = _partition_list.erase(it);
121
281
        }
122
281
        _partition_map.clear();
123
281
        partition = new PartitionRowBatch(partition_key);
124
281
        partition->set_row_batch(value);
125
281
        _partition_map[partition_key] = partition;
126
281
        _partition_list.push_back(partition);
127
281
    }
128
72.0k
    _data_size += partition->get_data_size();
129
72.0k
    VLOG(1) << "finish update sql cache batches:" << _partition_list.size();
130
72.0k
    return PCacheStatus::CACHE_OK;
131
72.0k
}
132
133
PCacheStatus ResultNode::update_partition_cache(const PUpdateCacheRequest* request,
134
17
                                                bool& is_update_firstkey) {
135
17
    PartitionKey first_key = std::numeric_limits<long>::max();
136
17
    if (_partition_list.size() == 0) {
137
16
        is_update_firstkey = true;
138
16
    } else {
139
1
        first_key = (*(_partition_list.begin()))->get_partition_key();
140
1
    }
141
17
    PartitionRowBatch* partition = nullptr;
142
10.2k
    for (int i = 0; i < request->values_size(); i++) {
143
10.2k
        const PCacheValue& value = request->values(i);
144
10.2k
        PartitionKey partition_key = value.param().partition_key();
145
10.2k
        if (!is_update_firstkey && partition_key <= first_key) {
146
0
            is_update_firstkey = true;
147
0
        }
148
10.2k
        auto it = _partition_map.find(partition_key);
149
10.2k
        if (it == _partition_map.end()) {
150
10.2k
            partition = new PartitionRowBatch(partition_key);
151
10.2k
            partition->set_row_batch(value);
152
10.2k
            _partition_map[partition_key] = partition;
153
10.2k
            _partition_list.push_back(partition);
154
#ifdef PARTITION_CACHE_DEV
155
            LOG(INFO) << "add index:" << i << ", pkey:" << partition->get_partition_key()
156
                      << ", list size:" << _partition_list.size()
157
                      << ", map size:" << _partition_map.size();
158
#endif
159
10.2k
        } else {
160
0
            partition = it->second;
161
0
            _data_size -= partition->get_data_size();
162
0
            partition->set_row_batch(value);
163
#ifdef PARTITION_CACHE_DEV
164
            LOG(INFO) << "update index:" << i << ", pkey:" << partition->get_partition_key()
165
                      << ", list size:" << _partition_list.size()
166
                      << ", map size:" << _partition_map.size();
167
#endif
168
0
        }
169
10.2k
        _data_size += partition->get_data_size();
170
10.2k
    }
171
17
    _partition_list.sort(compare_partition);
172
17
    VLOG(1) << "finish update partition cache batches:" << _partition_list.size();
173
17
    while (config::query_cache_max_partition_count > 0 &&
174
17
           _partition_list.size() > config::query_cache_max_partition_count) {
175
0
        if (prune_first() == 0) {
176
0
            break;
177
0
        }
178
0
    }
179
17
    return PCacheStatus::CACHE_OK;
180
17
}
181
182
/**
183
* Only the range query of the key of the partition is supported, and the separated partition key query is not supported.
184
* Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node from BE.
185
* Partition cache : 20191211-20191215
186
* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215]
187
* Miss cache parameter: [20191210 - 20191216]
188
*/
189
PCacheStatus ResultNode::fetch_partition(const PFetchCacheRequest* request,
190
                                         PartitionRowBatchList& row_batch_list,
191
3.96k
                                         bool& is_hit_firstkey) {
192
3.96k
    is_hit_firstkey = false;
193
3.96k
    if (request->params_size() == 0) {
194
0
        return PCacheStatus::PARAM_ERROR;
195
0
    }
196
197
3.96k
    CacheReadLock read_lock(_node_mtx);
198
199
3.96k
    if (_partition_list.size() == 0) {
200
0
        return PCacheStatus::NO_PARTITION_KEY;
201
0
    }
202
203
3.96k
    if (request->params(0).partition_key() > (*_partition_list.rbegin())->get_partition_key() ||
204
3.96k
        request->params(request->params_size() - 1).partition_key() <
205
3.96k
                (*_partition_list.begin())->get_partition_key()) {
206
3
        return PCacheStatus::NO_PARTITION_KEY;
207
3
    }
208
209
3.96k
    bool find = false;
210
3.96k
    int begin_idx = -1, end_idx = -1, param_idx = 0;
211
3.96k
    auto begin_it = _partition_list.end();
212
3.96k
    auto end_it = _partition_list.end();
213
3.96k
    auto part_it = _partition_list.begin();
214
215
3.96k
    PCacheStatus status = PCacheStatus::CACHE_OK;
216
7.92k
    while (param_idx < request->params_size() && part_it != _partition_list.end()) {
217
#ifdef PARTITION_CACHE_DEV
218
        LOG(INFO) << "Param index : " << param_idx
219
                  << ", param part Key : " << request->params(param_idx).partition_key()
220
                  << ", batch part key : " << (*part_it)->get_partition_key();
221
#endif
222
3.96k
        if (!find) {
223
3.96k
            while (part_it != _partition_list.end() &&
224
3.96k
                   request->params(param_idx).partition_key() > (*part_it)->get_partition_key()) {
225
1
                part_it++;
226
1
            }
227
3.96k
            if (part_it != _partition_list.end()) {
228
3.96k
                while (param_idx < request->params_size() &&
229
3.96k
                       request->params(param_idx).partition_key() <
230
3.96k
                               (*part_it)->get_partition_key()) {
231
2
                    param_idx++;
232
2
                }
233
3.96k
                if (param_idx < request->params_size()) {
234
3.96k
                    if (request->params(param_idx).partition_key() ==
235
3.96k
                        (*part_it)->get_partition_key()) {
236
3.96k
                        find = true;
237
3.96k
                    }
238
3.96k
                }
239
3.96k
            }
240
3.96k
        }
241
242
3.96k
        if (find) {
243
#ifdef PARTITION_CACHE_DEV
244
            LOG(INFO) << "Find! Param index : " << param_idx
245
                      << ", param part Key : " << request->params(param_idx).partition_key()
246
                      << ", batch part key : " << (*part_it)->get_partition_key()
247
                      << ", param part version : " << request->params(param_idx).last_version()
248
                      << ", batch part version : "
249
                      << (*part_it)->get_value()->param().last_version()
250
                      << ", param part version time : "
251
                      << request->params(param_idx).last_version_time()
252
                      << ", batch part version time : "
253
                      << (*part_it)->get_value()->param().last_version_time();
254
#endif
255
3.96k
            if ((*part_it)->is_hit_cache(request->params(param_idx))) {
256
3.96k
                if (begin_idx < 0) {
257
3.96k
                    begin_idx = param_idx;
258
3.96k
                    begin_it = part_it;
259
3.96k
                }
260
3.96k
                end_idx = param_idx;
261
3.96k
                end_it = part_it;
262
3.96k
                param_idx++;
263
3.96k
                part_it++;
264
3.96k
                find = false;
265
3.96k
            } else {
266
1
                status = PCacheStatus::DATA_OVERDUE;
267
1
                break;
268
1
            }
269
3.96k
        }
270
3.96k
    }
271
272
3.96k
    if (begin_it == _partition_list.end() && end_it == _partition_list.end()) {
273
1
        return status;
274
1
    }
275
276
    //[20191210 - 20191216] hit partition range [20191212-20191214],the sql will be splited to 3 part!
277
3.96k
    if (begin_idx != 0 && end_idx != request->params_size() - 1) {
278
1
        return PCacheStatus::INVALID_KEY_RANGE;
279
1
    }
280
3.96k
    if (begin_it == _partition_list.begin()) {
281
3.95k
        is_hit_firstkey = true;
282
3.95k
    }
283
284
3.96k
    while (true) {
285
3.96k
        row_batch_list.push_back(*begin_it);
286
3.96k
        if (begin_it == end_it) {
287
3.96k
            break;
288
3.96k
        }
289
2
        begin_it++;
290
2
    }
291
3.96k
    return status;
292
3.96k
}
293
294
/*
295
* prune first partition result
296
*/
297
0
size_t ResultNode::prune_first() {
298
0
    if (_partition_list.size() == 0) {
299
0
        return 0;
300
0
    }
301
0
    PartitionRowBatch* part_node = *_partition_list.begin();
302
0
    size_t prune_size = part_node->get_data_size();
303
0
    _partition_list.erase(_partition_list.begin());
304
0
    _partition_map.erase(part_node->get_partition_key());
305
0
    part_node->clear();
306
0
    SAFE_DELETE(part_node);
307
0
    _data_size -= prune_size;
308
0
    return prune_size;
309
0
}
310
311
77
void ResultNode::clear() {
312
77
    CacheWriteLock write_lock(_node_mtx);
313
77
    LOG(INFO) << "clear result node:" << _sql_key;
314
77
    _sql_key.hi = 0;
315
77
    _sql_key.lo = 0;
316
10.3k
    for (auto it = _partition_list.begin(); it != _partition_list.end();) {
317
10.3k
        (*it)->clear();
318
10.3k
        SAFE_DELETE(*it);
319
10.3k
        it = _partition_list.erase(it);
320
10.3k
    }
321
77
    _data_size = 0;
322
77
}
323
324
74.2k
void ResultNode::append(ResultNode* tail) {
325
74.2k
    _prev = tail;
326
74.2k
    if (tail) tail->set_next(this);
327
74.2k
}
328
329
2.48k
void ResultNode::unlink() {
330
2.48k
    if (_next) {
331
2.48k
        _next->set_prev(_prev);
332
2.48k
    }
333
2.48k
    if (_prev) {
334
2.48k
        _prev->set_next(_next);
335
2.48k
    }
336
2.48k
    _next = nullptr;
337
2.48k
    _prev = nullptr;
338
2.48k
}
339
340
} // namespace doris