be/src/storage/segment/column_reader_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/column_reader_cache.h" |
19 | | |
20 | | #include "storage/segment/column_meta_accessor.h" |
21 | | #include "storage/segment/segment.h" |
22 | | #include "storage/segment/variant/variant_column_reader.h" |
23 | | |
24 | | bvar::Adder<int64_t> g_segment_column_reader_cache_count("segment_column_cache_count"); |
25 | | bvar::Adder<int64_t> g_segment_column_cache_hit_count("segment_column_cache_hit_count"); |
26 | | bvar::Adder<int64_t> g_segment_column_cache_miss_count("segment_column_cache_miss_count"); |
27 | | bvar::Adder<int64_t> g_segment_column_cache_evict_count("segment_column_cache_evict_count"); |
28 | | |
29 | | namespace doris::segment_v2 { |
30 | | |
31 | | #include "common/compile_check_begin.h" |
32 | | |
33 | | ColumnReaderCache::ColumnReaderCache( |
34 | | ColumnMetaAccessor* accessor, TabletSchemaSPtr tablet_schema, |
35 | | io::FileReaderSPtr file_reader, uint64_t num_rows, |
36 | | std::function<Status(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics*)> |
37 | | get_footer_cb) |
38 | 5.73k | : _accessor(accessor), |
39 | 5.73k | _tablet_schema(std::move(tablet_schema)), |
40 | 5.73k | _file_reader(std::move(file_reader)), |
41 | 5.73k | _num_rows(num_rows), |
42 | 5.73k | _get_footer_cb(std::move(get_footer_cb)) {} |
43 | | |
44 | 5.73k | ColumnReaderCache::~ColumnReaderCache() { |
45 | 5.73k | g_segment_column_reader_cache_count << -_cache_map.size(); |
46 | 5.73k | } |
47 | | |
48 | 17.4k | std::shared_ptr<ColumnReader> ColumnReaderCache::_lookup(const ColumnReaderCacheKey& key) { |
49 | 17.4k | std::lock_guard<std::mutex> lock(_cache_mutex); |
50 | 17.4k | auto it = _cache_map.find(key); |
51 | 17.4k | if (it == _cache_map.end()) { |
52 | 13.4k | g_segment_column_cache_miss_count << 1; |
53 | 13.4k | return nullptr; |
54 | 13.4k | } |
55 | | // Move the accessed node to the front of the linked list |
56 | 3.96k | _lru_list.splice(_lru_list.begin(), _lru_list, it->second); |
57 | 3.96k | DCHECK_EQ(it->second->key.first, key.first); |
58 | 3.96k | g_segment_column_cache_hit_count << 1; |
59 | 3.96k | return it->second->reader; |
60 | 17.4k | } |
61 | | |
62 | | void ColumnReaderCache::_insert_locked_nocheck(const ColumnReaderCacheKey& key, |
63 | 13.4k | const std::shared_ptr<ColumnReader>& reader) { |
64 | | // If capacity exceeded, remove least recently used (tail) |
65 | 13.4k | if (_cache_map.size() >= config::max_segment_partial_column_cache_size) { |
66 | 220 | g_segment_column_reader_cache_count << -1; |
67 | 220 | g_segment_column_cache_evict_count << 1; |
68 | 220 | auto last_it = _lru_list.end(); |
69 | 220 | --last_it; |
70 | 220 | _cache_map.erase(last_it->key); |
71 | 220 | _lru_list.pop_back(); |
72 | 220 | } |
73 | 13.4k | g_segment_column_reader_cache_count << 1; |
74 | 13.4k | _lru_list.push_front(CacheNode { |
75 | 13.4k | .key = key, .reader = reader, .last_access = std::chrono::steady_clock::now()}); |
76 | 13.4k | _cache_map[key] = _lru_list.begin(); |
77 | 13.4k | } |
78 | | |
79 | | void ColumnReaderCache::_insert_direct(const ColumnReaderCacheKey& key, |
80 | 13.4k | const std::shared_ptr<ColumnReader>& column_reader) { |
81 | 13.4k | std::lock_guard<std::mutex> lock(_cache_mutex); |
82 | 13.4k | _insert_locked_nocheck(key, column_reader); |
83 | 13.4k | } |
84 | | |
85 | | std::map<int32_t, std::shared_ptr<ColumnReader>> ColumnReaderCache::get_available_readers( |
86 | 7 | bool include_subcolumns) { |
87 | 7 | std::lock_guard<std::mutex> lock(_cache_mutex); |
88 | 7 | std::map<int32_t, std::shared_ptr<ColumnReader>> readers; |
89 | 12 | for (const auto& node : _lru_list) { |
90 | 12 | if (include_subcolumns || node.key.second.empty()) { |
91 | 12 | readers.insert({node.key.first, node.reader}); |
92 | 12 | } |
93 | 12 | } |
94 | 7 | return readers; |
95 | 7 | } |
96 | | |
97 | | Status ColumnReaderCache::get_column_reader(int32_t col_uid, |
98 | | std::shared_ptr<ColumnReader>* column_reader, |
99 | 17.1k | OlapReaderStatistics* stats) { |
100 | | // Attempt to find in cache |
101 | 17.1k | if (auto cached = _lookup({col_uid, {}})) { |
102 | 3.97k | *column_reader = cached; |
103 | 3.97k | return Status::OK(); |
104 | 3.97k | } |
105 | | // Load footer once under cache mutex (not thread-safe otherwise) |
106 | 13.1k | std::shared_ptr<SegmentFooterPB> footer_pb_shared; |
107 | 13.1k | { |
108 | 13.1k | std::lock_guard<std::mutex> lock(_cache_mutex); |
109 | 13.1k | RETURN_IF_ERROR(_get_footer_cb(footer_pb_shared, stats)); |
110 | 13.1k | } |
111 | | |
112 | | // Lookup column meta by uid via ColumnMetaAccessor. If not initialized or not found, return NOT_FOUND. |
113 | 13.1k | ColumnMetaPB meta; |
114 | 13.1k | Status st_meta = _accessor->get_column_meta_by_uid(*footer_pb_shared, col_uid, &meta); |
115 | 13.1k | if (st_meta.is<ErrorCode::NOT_FOUND>()) { |
116 | 0 | *column_reader = nullptr; |
117 | 0 | return st_meta; |
118 | 0 | } |
119 | 13.1k | RETURN_IF_ERROR(st_meta); |
120 | | |
121 | 13.1k | ColumnReaderOptions opts {.kept_in_memory = _tablet_schema->is_in_memory(), |
122 | 13.1k | .be_exec_version = _be_exec_version, |
123 | 13.1k | .tablet_schema = _tablet_schema}; |
124 | | |
125 | 13.1k | std::shared_ptr<ColumnReader> reader; |
126 | 13.1k | if ((FieldType)meta.type() == FieldType::OLAP_FIELD_TYPE_VARIANT) { |
127 | | // Variant root columns require VariantColumnReader, which encapsulates |
128 | | // subcolumn layout, sparse columns and external meta. |
129 | 359 | std::unique_ptr<VariantColumnReader> variant_reader(new VariantColumnReader()); |
130 | 359 | RETURN_IF_ERROR(variant_reader->init(opts, _accessor, footer_pb_shared, col_uid, _num_rows, |
131 | 359 | _file_reader)); |
132 | 359 | reader.reset(variant_reader.release()); |
133 | 359 | VLOG_DEBUG << "insert cache (variant): uid=" << col_uid << " col_id=" << meta.column_id(); |
134 | 12.7k | } else { |
135 | | // For non-variant columns, we can create reader directly from ColumnMetaPB. |
136 | 18.4E | VLOG_DEBUG << "insert cache: uid=" << col_uid << " col_id=" << meta.column_id(); |
137 | 12.7k | RETURN_IF_ERROR(ColumnReader::create(opts, meta, _num_rows, _file_reader, &reader)); |
138 | 12.7k | } |
139 | | |
140 | 13.1k | _insert_direct({col_uid, {}}, reader); |
141 | 13.1k | *column_reader = std::move(reader); |
142 | 13.1k | return Status::OK(); |
143 | 13.1k | } |
144 | | |
145 | | Status ColumnReaderCache::get_path_column_reader(int32_t col_uid, PathInData relative_path, |
146 | | std::shared_ptr<ColumnReader>* column_reader, |
147 | | OlapReaderStatistics* stats, |
148 | 306 | const SubcolumnColumnMetaInfo::Node* node_hint) { |
149 | | // Attempt to find in cache first |
150 | 306 | if (auto cached = _lookup({col_uid, relative_path})) { |
151 | 0 | *column_reader = cached; |
152 | 0 | return Status::OK(); |
153 | 0 | } |
154 | | |
155 | 306 | if (!_accessor->has_column_uid(col_uid)) { |
156 | 0 | *column_reader = nullptr; |
157 | 0 | return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}", |
158 | 0 | col_uid); |
159 | 0 | } |
160 | | |
161 | | // Load footer once under cache mutex (not thread-safe otherwise) |
162 | 306 | std::shared_ptr<SegmentFooterPB> footer_pb_shared; |
163 | 306 | { |
164 | 306 | std::lock_guard<std::mutex> lock(_cache_mutex); |
165 | 306 | RETURN_IF_ERROR(_get_footer_cb(footer_pb_shared, stats)); |
166 | 306 | } |
167 | | |
168 | | // Ensure variant root reader is available in cache. |
169 | 306 | ColumnReaderOptions opts {.kept_in_memory = _tablet_schema->is_in_memory(), |
170 | 306 | .be_exec_version = _be_exec_version, |
171 | 306 | .tablet_schema = _tablet_schema}; |
172 | 306 | std::shared_ptr<ColumnReader> variant_column_reader; |
173 | 306 | RETURN_IF_ERROR(get_column_reader(col_uid, &variant_column_reader, stats)); |
174 | | |
175 | 306 | if (relative_path.empty()) { |
176 | 0 | *column_reader = std::move(variant_column_reader); |
177 | 0 | return Status::OK(); |
178 | 0 | } |
179 | | |
180 | | // Delegate path-level reader creation to VariantColumnReader, which hides |
181 | | // inline vs external meta details. |
182 | 306 | std::shared_ptr<ColumnReader> path_reader; |
183 | 306 | auto* vreader = static_cast<VariantColumnReader*>(variant_column_reader.get()); |
184 | 306 | Status st = vreader->create_path_reader(relative_path, opts, _accessor, *footer_pb_shared, |
185 | 306 | _file_reader, _num_rows, &path_reader); |
186 | 306 | if (st.is<ErrorCode::NOT_FOUND>()) { |
187 | 5 | *column_reader = nullptr; |
188 | 5 | return st; |
189 | 5 | } |
190 | 301 | RETURN_IF_ERROR(st); |
191 | | |
192 | | // Cache and return |
193 | 301 | _insert_direct({col_uid, relative_path}, path_reader); |
194 | 301 | *column_reader = std::move(path_reader); |
195 | 301 | return Status::OK(); |
196 | 301 | } |
197 | | |
198 | | #include "common/compile_check_end.h" |
199 | | |
200 | | } // namespace doris::segment_v2 |