be/src/storage/index/index_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/index/index_file_reader.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <utility> |
22 | | |
23 | | #include "storage/index/inverted/inverted_index_compound_reader.h" |
24 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
25 | | #include "storage/tablet/tablet_schema.h" |
26 | | #include "util/debug_points.h" |
27 | | |
28 | | namespace doris::segment_v2 { |
29 | | |
30 | 2.41k | Status IndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) { |
31 | 2.41k | std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing |
32 | 2.41k | if (!_inited) { |
33 | 2.31k | _read_buffer_size = read_buffer_size; |
34 | 2.31k | if (_storage_format >= InvertedIndexStorageFormatPB::V2) { |
35 | 2.30k | RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx)); |
36 | 2.30k | } |
37 | 2.29k | _inited = true; |
38 | 2.29k | } |
39 | 2.40k | return Status::OK(); |
40 | 2.41k | } |
41 | | |
42 | 2.30k | Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) { |
43 | 2.30k | auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); |
44 | | |
45 | 2.30k | try { |
46 | 2.30k | CLuceneError err; |
47 | 2.30k | CL_NS(store)::IndexInput* index_input = nullptr; |
48 | | |
49 | | // 1. get file size from meta |
50 | 2.30k | int64_t file_size = -1; |
51 | 2.30k | if (_idx_file_info.has_index_size()) { |
52 | 1.59k | file_size = _idx_file_info.index_size(); |
53 | 1.59k | } |
54 | 2.30k | file_size = file_size == 0 ? -1 : file_size; |
55 | | |
56 | 2.30k | DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { |
57 | 2.30k | if (file_size == -1) { |
58 | 2.30k | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
59 | 2.30k | "CLuceneError occur file size = -1, file is {}", index_file_full_path); |
60 | 2.30k | } |
61 | 2.30k | }) |
62 | | |
63 | 2.30k | DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: " |
64 | 0 | << index_file_full_path; |
65 | | // 2. open file |
66 | 2.30k | auto ok = |
67 | 2.30k | DorisFSDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), index_input, |
68 | 2.30k | err, read_buffer_size, file_size, _tablet_id); |
69 | 2.30k | if (!ok) { |
70 | 15 | if (err.number() == CL_ERR_FileNotFound) { |
71 | 14 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
72 | 14 | "inverted index file {} is not found.", index_file_full_path); |
73 | 14 | } else if (err.number() == CL_ERR_EmptyIndexSegment) { |
74 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>( |
75 | 1 | "inverted index file {} is empty.", index_file_full_path); |
76 | 1 | } |
77 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
78 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path, |
79 | 0 | err.what()); |
80 | 15 | } |
81 | 2.28k | _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input); |
82 | 2.28k | _stream->setIoContext(io_ctx); |
83 | 2.28k | _stream->setIndexFile(true); |
84 | | |
85 | | // 3. read file |
86 | 2.28k | int32_t version = _stream->readInt(); // Read version number |
87 | 2.28k | if (version >= InvertedIndexStorageFormatPB::V2) { |
88 | 2.28k | DCHECK(version == _storage_format); |
89 | 2.28k | int32_t numIndices = _stream->readInt(); // Read number of indices |
90 | | |
91 | 69.0k | for (int32_t i = 0; i < numIndices; ++i) { |
92 | 66.7k | int64_t indexId = _stream->readLong(); // Read index ID |
93 | 66.7k | int32_t suffix_length = _stream->readInt(); // Read suffix length |
94 | 66.7k | std::vector<uint8_t> suffix_data(suffix_length); |
95 | 66.7k | _stream->readBytes(suffix_data.data(), suffix_length); |
96 | 66.7k | std::string suffix_str(suffix_data.begin(), suffix_data.end()); |
97 | | |
98 | 66.7k | int32_t numFiles = _stream->readInt(); // Read number of files in the index |
99 | | |
100 | 66.7k | auto fileEntries = std::make_unique<EntriesType>(); |
101 | 66.7k | fileEntries->reserve(numFiles); |
102 | | |
103 | 690k | for (int32_t j = 0; j < numFiles; ++j) { |
104 | 623k | int32_t file_name_length = _stream->readInt(); |
105 | 623k | std::string file_name(file_name_length, '\0'); |
106 | 623k | _stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()), |
107 | 623k | file_name_length); |
108 | 623k | auto entry = std::make_unique<ReaderFileEntry>(); |
109 | 623k | entry->file_name = std::move(file_name); |
110 | 623k | entry->offset = _stream->readLong(); |
111 | 623k | entry->length = _stream->readLong(); |
112 | 623k | fileEntries->emplace(entry->file_name, std::move(entry)); |
113 | 623k | } |
114 | | |
115 | 66.7k | _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)), |
116 | 66.7k | std::move(fileEntries)); |
117 | 66.7k | } |
118 | 2.28k | } else { |
119 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
120 | 0 | "unknown inverted index format {}", version); |
121 | 0 | } |
122 | 2.28k | } catch (CLuceneError& err) { |
123 | 1 | if (_stream != nullptr) { |
124 | 1 | try { |
125 | 1 | _stream->close(); |
126 | 1 | } catch (CLuceneError& err) { |
127 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
128 | 0 | "CLuceneError occur when close idx file {}, error msg: {}", |
129 | 0 | index_file_full_path, err.what()); |
130 | 0 | } |
131 | 1 | } |
132 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
133 | 1 | "CLuceneError occur when init idx file {}, error msg: {}", index_file_full_path, |
134 | 1 | err.what()); |
135 | 1 | } |
136 | 2.28k | return Status::OK(); |
137 | 2.30k | } |
138 | | |
139 | 182 | Result<InvertedIndexDirectoryMap> IndexFileReader::get_all_directories() { |
140 | 182 | InvertedIndexDirectoryMap res; |
141 | 182 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
142 | 1.64k | for (auto& [index, _] : _indices_entries) { |
143 | 1.64k | auto&& [index_id, index_suffix] = index; |
144 | 1.64k | LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix; |
145 | 1.64k | auto ret = _open(index_id, index_suffix); |
146 | 1.64k | if (!ret.has_value()) { |
147 | 0 | return ResultError(ret.error()); |
148 | 0 | } |
149 | 1.64k | res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value())); |
150 | 1.64k | } |
151 | 182 | return res; |
152 | 182 | } |
153 | | |
154 | | Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::_open( |
155 | 4.76k | int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const { |
156 | 4.76k | std::unique_ptr<DorisCompoundReader, DirectoryDeleter> compound_reader; |
157 | | |
158 | 4.76k | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
159 | 15 | auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( |
160 | 15 | _index_path_prefix, index_id, index_suffix); |
161 | 15 | try { |
162 | 15 | CLuceneError err; |
163 | 15 | CL_NS(store)::IndexInput* index_input = nullptr; |
164 | | |
165 | | // 1. get file size from meta |
166 | 15 | int64_t file_size = -1; |
167 | 15 | if (_idx_file_info.index_info_size() > 0) { |
168 | 1 | for (const auto& idx_info : _idx_file_info.index_info()) { |
169 | 1 | if (index_id == idx_info.index_id() && |
170 | 1 | index_suffix == idx_info.index_suffix()) { |
171 | 1 | file_size = idx_info.index_file_size(); |
172 | 1 | break; |
173 | 1 | } |
174 | 1 | } |
175 | 1 | } |
176 | 15 | file_size = file_size == 0 ? -1 : file_size; |
177 | 15 | DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { |
178 | 15 | if (file_size == -1) { |
179 | 15 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
180 | 15 | "CLuceneError occur file size = -1, file is {}", index_file_path)); |
181 | 15 | } |
182 | 15 | }) |
183 | 15 | DCHECK(_fs != nullptr) |
184 | 0 | << "file system is nullptr, index_file_path: " << index_file_path; |
185 | | // 2. open file |
186 | 15 | auto ok = DorisFSDirectory::FSIndexInput::open(_fs, index_file_path.c_str(), |
187 | 15 | index_input, err, _read_buffer_size, |
188 | 15 | file_size, _tablet_id); |
189 | 15 | if (!ok) { |
190 | | // now index_input = nullptr |
191 | 1 | if (err.number() == CL_ERR_FileNotFound) { |
192 | 1 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
193 | 1 | "inverted index file {} is not found.", index_file_path)); |
194 | 1 | } |
195 | 0 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
196 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, |
197 | 0 | err.what())); |
198 | 1 | } |
199 | | |
200 | | // 3. read file in DorisCompoundReader |
201 | 14 | compound_reader.reset(new DorisCompoundReader(index_input, _read_buffer_size)); |
202 | 14 | } catch (CLuceneError& err) { |
203 | 0 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
204 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, |
205 | 0 | err.what())); |
206 | 0 | } |
207 | 4.74k | } else { |
208 | 4.74k | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
209 | 4.74k | if (_stream == nullptr) { |
210 | 1 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
211 | 1 | "CLuceneError occur when open idx file {}, stream is nullptr", |
212 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix))); |
213 | 1 | } |
214 | | |
215 | | // Check if the specified index exists |
216 | 4.74k | auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix)); |
217 | 4.74k | if (index_it == _indices_entries.end()) { |
218 | 4 | std::ostringstream errMsg; |
219 | 4 | errMsg << "No index with id " << index_id << " found"; |
220 | 4 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
221 | 4 | "CLuceneError occur when open idx file {}, error msg: {}", |
222 | 4 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix), |
223 | 4 | errMsg.str())); |
224 | 4 | } |
225 | | // Need to clone resource here, because index searcher cache need it. |
226 | 4.74k | compound_reader.reset(new DorisCompoundReader(_stream->clone(), *index_it->second, |
227 | 4.74k | _read_buffer_size, io_ctx)); |
228 | 4.74k | } |
229 | 4.75k | return compound_reader; |
230 | 4.76k | } |
231 | | |
232 | | Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::open( |
233 | 2.50k | const TabletIndex* index_meta, const io::IOContext* io_ctx) const { |
234 | 2.50k | auto index_id = index_meta->index_id(); |
235 | 2.50k | auto index_suffix = index_meta->get_index_suffix(); |
236 | 2.50k | return _open(index_id, index_suffix, io_ctx); |
237 | 2.50k | } |
238 | | |
239 | 331 | std::string IndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const { |
240 | 331 | return InvertedIndexDescriptor::get_index_file_cache_key( |
241 | 331 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
242 | 331 | } |
243 | | |
244 | 1.30k | std::string IndexFileReader::get_index_file_path(const TabletIndex* index_meta) const { |
245 | 1.30k | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
246 | 0 | return InvertedIndexDescriptor::get_index_file_path_v1( |
247 | 0 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
248 | 0 | } |
249 | 1.30k | return InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); |
250 | 1.30k | } |
251 | | |
252 | 1 | Status IndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const { |
253 | 1 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
254 | 0 | auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( |
255 | 0 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
256 | 0 | return _fs->exists(index_file_path, res); |
257 | 1 | } else { |
258 | 1 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
259 | 1 | if (_stream == nullptr) { |
260 | 1 | *res = false; |
261 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
262 | 1 | "idx file {} is not opened", |
263 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)); |
264 | 1 | } |
265 | | // Check if the specified index exists |
266 | 0 | auto index_it = _indices_entries.find( |
267 | 0 | std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); |
268 | 0 | if (index_it == _indices_entries.end()) { |
269 | 0 | *res = false; |
270 | 0 | } else { |
271 | 0 | *res = true; |
272 | 0 | } |
273 | 0 | } |
274 | 0 | return Status::OK(); |
275 | 1 | } |
276 | | |
277 | 5 | Status IndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const { |
278 | 5 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
279 | 1 | *res = true; |
280 | 1 | return Status::OK(); |
281 | 1 | } |
282 | 4 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
283 | 4 | if (_stream == nullptr) { |
284 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
285 | 1 | "idx file {} is not opened", |
286 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)); |
287 | 1 | } |
288 | | // Check if the specified index exists |
289 | 3 | auto index_it = _indices_entries.find( |
290 | 3 | std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); |
291 | 3 | if (index_it == _indices_entries.end()) { |
292 | 1 | *res = false; |
293 | 2 | } else { |
294 | 2 | const auto& entries = index_it->second; |
295 | 2 | auto entry_it = |
296 | 2 | entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name()); |
297 | 2 | if (entry_it == entries->end()) { |
298 | 0 | *res = false; |
299 | 0 | return Status::OK(); |
300 | 0 | } |
301 | 2 | const auto& e = entry_it->second; |
302 | | // roaring bitmap cookie header size is 5 |
303 | 2 | if (e->length <= 5) { |
304 | 1 | *res = false; |
305 | 1 | } else { |
306 | 1 | *res = true; |
307 | 1 | } |
308 | 2 | } |
309 | 3 | return Status::OK(); |
310 | 3 | } |
311 | | |
312 | 1 | void IndexFileReader::debug_file_entries() { |
313 | 1 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
314 | 1 | for (const auto& index : _indices_entries) { |
315 | 1 | LOG(INFO) << "index_id:" << index.first.first; |
316 | 1 | const auto& index_entries = index.second; |
317 | 2 | for (const auto& entry : *index_entries) { |
318 | 2 | const auto& file_entry = entry.second; |
319 | | LOG(INFO) << "file entry name:" << file_entry->file_name |
320 | 2 | << " length:" << file_entry->length << " offset:" << file_entry->offset; |
321 | 2 | } |
322 | 1 | } |
323 | 1 | } |
324 | | |
325 | | } // namespace doris::segment_v2 |