be/src/storage/segment/segment_loader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/segment_loader.h" |
19 | | |
20 | | #include <butil/time.h> |
21 | | |
22 | | #include "common/config.h" |
23 | | #include "common/status.h" |
24 | | #include "storage/olap_define.h" |
25 | | #include "storage/rowset/beta_rowset.h" |
26 | | #include "util/stopwatch.hpp" |
27 | | |
28 | | namespace doris { |
29 | | |
30 | 2.31M | SegmentLoader* SegmentLoader::instance() { |
31 | 2.31M | return ExecEnv::GetInstance()->segment_loader(); |
32 | 2.31M | } |
33 | | |
34 | 2.11M | bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { |
35 | 2.11M | auto* lru_handle = LRUCachePolicy::lookup(key.encode()); |
36 | 2.11M | if (lru_handle == nullptr) { |
37 | 1.62M | return false; |
38 | 1.62M | } |
39 | 485k | handle->push_segment(this, lru_handle); |
40 | 485k | return true; |
41 | 2.11M | } |
42 | | |
43 | | void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, |
44 | 6.41k | SegmentCacheHandle* handle) { |
45 | 6.41k | auto* lru_handle = |
46 | 6.41k | LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(), |
47 | 6.41k | value.segment->meta_mem_usage(), CachePriority::NORMAL); |
48 | 6.41k | handle->push_segment(this, lru_handle); |
49 | 6.41k | } |
50 | | |
51 | 57.8k | void SegmentCache::erase(const SegmentCache::CacheKey& key) { |
52 | 57.8k | LRUCachePolicy::erase(key.encode()); |
53 | 57.8k | } |
54 | | |
55 | | Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id, |
56 | | SegmentCacheHandle* cache_handle, bool use_cache, |
57 | | bool need_load_pk_index_and_bf, |
58 | 2.11M | OlapReaderStatistics* index_load_stats) { |
59 | 2.11M | auto start = MonotonicMicros(); |
60 | 2.11M | SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id); |
61 | 2.11M | if (_segment_cache->lookup(cache_key, cache_handle)) { |
62 | | // Has to check the segment status here, because the segment in cache may has something wrong during |
63 | | // load index or create column reader. |
64 | | // Not merge this if logic with previous to make the logic more clear. |
65 | 488k | if (cache_handle->pop_unhealthy_segment() == nullptr) { |
66 | 488k | if (index_load_stats != nullptr) { |
67 | 485k | index_load_stats->rowset_reader_load_segments_timer_ns += |
68 | 485k | (MonotonicMicros() - start) * 1000; |
69 | 485k | } |
70 | 488k | return Status::OK(); |
71 | 488k | } |
72 | 488k | } |
73 | | // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. |
74 | 1.62M | segment_v2::SegmentSharedPtr segment; |
75 | 1.62M | RETURN_IF_ERROR(rowset->load_segment(segment_id, index_load_stats, &segment)); |
76 | 1.62M | if (need_load_pk_index_and_bf) { |
77 | 70.0k | RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats)); |
78 | 70.0k | } |
79 | 1.62M | if (use_cache && !config::disable_segment_cache) { |
80 | | // memory of SegmentCache::CacheValue will be handled by SegmentCache |
81 | 6.40k | auto* cache_value = new SegmentCache::CacheValue(segment); |
82 | 6.40k | _cache_mem_usage += segment->meta_mem_usage(); |
83 | 6.40k | _segment_cache->insert(cache_key, *cache_value, cache_handle); |
84 | 1.62M | } else { |
85 | 1.62M | cache_handle->push_segment(std::move(segment)); |
86 | 1.62M | } |
87 | 1.62M | if (index_load_stats != nullptr) { |
88 | 1.53M | index_load_stats->rowset_reader_load_segments_timer_ns += |
89 | 1.53M | (MonotonicMicros() - start) * 1000; |
90 | 1.53M | } |
91 | | |
92 | 1.62M | return Status::OK(); |
93 | 1.62M | } |
94 | | |
95 | | Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, |
96 | | SegmentCacheHandle* cache_handle, bool use_cache, |
97 | | bool need_load_pk_index_and_bf, |
98 | 134k | OlapReaderStatistics* index_load_stats) { |
99 | 134k | if (cache_handle->is_inited()) { |
100 | 0 | return Status::OK(); |
101 | 0 | } |
102 | 262k | for (int64_t i = 0; i < rowset->num_segments(); i++) { |
103 | 127k | RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache, need_load_pk_index_and_bf, |
104 | 127k | index_load_stats)); |
105 | 127k | } |
106 | 134k | cache_handle->set_inited(); |
107 | 134k | return Status::OK(); |
108 | 134k | } |
109 | | |
110 | 57.8k | void SegmentLoader::erase_segment(const SegmentCache::CacheKey& key) { |
111 | 57.8k | _segment_cache->erase(key); |
112 | 57.8k | } |
113 | | |
114 | 193k | void SegmentLoader::erase_segments(const RowsetId& rowset_id, int64_t num_segments) { |
115 | 251k | for (int64_t i = 0; i < num_segments; i++) { |
116 | 57.8k | erase_segment(SegmentCache::CacheKey(rowset_id, i)); |
117 | 57.8k | } |
118 | 193k | } |
119 | | |
120 | | } // namespace doris |