be/src/storage/index/index_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/index/index_file_reader.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <utility> |
22 | | |
23 | | #include "storage/index/inverted/inverted_index_compound_reader.h" |
24 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
25 | | #include "storage/tablet/tablet_schema.h" |
26 | | #include "util/debug_points.h" |
27 | | |
28 | | namespace doris::segment_v2 { |
29 | | |
30 | 2.41k | Status IndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) { |
31 | 2.41k | std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing |
32 | 2.41k | if (!_inited) { |
33 | 2.31k | _read_buffer_size = read_buffer_size; |
34 | 2.31k | if (_storage_format >= InvertedIndexStorageFormatPB::V2) { |
35 | 2.30k | RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx)); |
36 | 2.30k | } |
37 | 2.29k | _inited = true; |
38 | 2.29k | } |
39 | 2.40k | return Status::OK(); |
40 | 2.41k | } |
41 | | |
42 | 2.30k | Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) { |
43 | 2.30k | auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); |
44 | | |
45 | 2.30k | try { |
46 | 2.30k | CLuceneError err; |
47 | 2.30k | CL_NS(store)::IndexInput* index_input = nullptr; |
48 | | |
49 | | // 1. get file size from meta |
50 | 2.30k | int64_t file_size = -1; |
51 | 2.30k | if (_idx_file_info.has_index_size()) { |
52 | 1.59k | file_size = _idx_file_info.index_size(); |
53 | 1.59k | } |
54 | 2.30k | file_size = file_size == 0 ? -1 : file_size; |
55 | | |
56 | 2.30k | DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { |
57 | 2.30k | if (file_size == -1) { |
58 | 2.30k | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
59 | 2.30k | "CLuceneError occur file size = -1, file is {}", index_file_full_path); |
60 | 2.30k | } |
61 | 2.30k | }) |
62 | | |
63 | 2.30k | DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: " |
64 | 0 | << index_file_full_path; |
65 | | // 2. open file |
66 | 2.30k | auto ok = DorisFSDirectory::FSIndexInput::open( |
67 | 2.30k | _fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size); |
68 | 2.30k | if (!ok) { |
69 | 15 | if (err.number() == CL_ERR_FileNotFound) { |
70 | 14 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
71 | 14 | "inverted index file {} is not found.", index_file_full_path); |
72 | 14 | } else if (err.number() == CL_ERR_EmptyIndexSegment) { |
73 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>( |
74 | 1 | "inverted index file {} is empty.", index_file_full_path); |
75 | 1 | } |
76 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
77 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path, |
78 | 0 | err.what()); |
79 | 15 | } |
80 | 2.28k | _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input); |
81 | 2.28k | _stream->setIoContext(io_ctx); |
82 | 2.28k | _stream->setIndexFile(true); |
83 | | |
84 | | // 3. read file |
85 | 2.28k | int32_t version = _stream->readInt(); // Read version number |
86 | 2.28k | if (version >= InvertedIndexStorageFormatPB::V2) { |
87 | 2.28k | DCHECK(version == _storage_format); |
88 | 2.28k | int32_t numIndices = _stream->readInt(); // Read number of indices |
89 | | |
90 | 69.0k | for (int32_t i = 0; i < numIndices; ++i) { |
91 | 66.7k | int64_t indexId = _stream->readLong(); // Read index ID |
92 | 66.7k | int32_t suffix_length = _stream->readInt(); // Read suffix length |
93 | 66.7k | std::vector<uint8_t> suffix_data(suffix_length); |
94 | 66.7k | _stream->readBytes(suffix_data.data(), suffix_length); |
95 | 66.7k | std::string suffix_str(suffix_data.begin(), suffix_data.end()); |
96 | | |
97 | 66.7k | int32_t numFiles = _stream->readInt(); // Read number of files in the index |
98 | | |
99 | 66.7k | auto fileEntries = std::make_unique<EntriesType>(); |
100 | 66.7k | fileEntries->reserve(numFiles); |
101 | | |
102 | 690k | for (int32_t j = 0; j < numFiles; ++j) { |
103 | 623k | int32_t file_name_length = _stream->readInt(); |
104 | 623k | std::string file_name(file_name_length, '\0'); |
105 | 623k | _stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()), |
106 | 623k | file_name_length); |
107 | 623k | auto entry = std::make_unique<ReaderFileEntry>(); |
108 | 623k | entry->file_name = std::move(file_name); |
109 | 623k | entry->offset = _stream->readLong(); |
110 | 623k | entry->length = _stream->readLong(); |
111 | 623k | fileEntries->emplace(entry->file_name, std::move(entry)); |
112 | 623k | } |
113 | | |
114 | 66.7k | _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)), |
115 | 66.7k | std::move(fileEntries)); |
116 | 66.7k | } |
117 | 2.28k | } else { |
118 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
119 | 0 | "unknown inverted index format {}", version); |
120 | 0 | } |
121 | 2.28k | } catch (CLuceneError& err) { |
122 | 1 | if (_stream != nullptr) { |
123 | 1 | try { |
124 | 1 | _stream->close(); |
125 | 1 | } catch (CLuceneError& err) { |
126 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
127 | 0 | "CLuceneError occur when close idx file {}, error msg: {}", |
128 | 0 | index_file_full_path, err.what()); |
129 | 0 | } |
130 | 1 | } |
131 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
132 | 1 | "CLuceneError occur when init idx file {}, error msg: {}", index_file_full_path, |
133 | 1 | err.what()); |
134 | 1 | } |
135 | 2.28k | return Status::OK(); |
136 | 2.30k | } |
137 | | |
138 | 182 | Result<InvertedIndexDirectoryMap> IndexFileReader::get_all_directories() { |
139 | 182 | InvertedIndexDirectoryMap res; |
140 | 182 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
141 | 1.64k | for (auto& [index, _] : _indices_entries) { |
142 | 1.64k | auto&& [index_id, index_suffix] = index; |
143 | 1.64k | LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix; |
144 | 1.64k | auto ret = _open(index_id, index_suffix); |
145 | 1.64k | if (!ret.has_value()) { |
146 | 0 | return ResultError(ret.error()); |
147 | 0 | } |
148 | 1.64k | res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value())); |
149 | 1.64k | } |
150 | 182 | return res; |
151 | 182 | } |
152 | | |
153 | | Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::_open( |
154 | 4.76k | int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const { |
155 | 4.76k | std::unique_ptr<DorisCompoundReader, DirectoryDeleter> compound_reader; |
156 | | |
157 | 4.76k | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
158 | 15 | auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( |
159 | 15 | _index_path_prefix, index_id, index_suffix); |
160 | 15 | try { |
161 | 15 | CLuceneError err; |
162 | 15 | CL_NS(store)::IndexInput* index_input = nullptr; |
163 | | |
164 | | // 1. get file size from meta |
165 | 15 | int64_t file_size = -1; |
166 | 15 | if (_idx_file_info.index_info_size() > 0) { |
167 | 1 | for (const auto& idx_info : _idx_file_info.index_info()) { |
168 | 1 | if (index_id == idx_info.index_id() && |
169 | 1 | index_suffix == idx_info.index_suffix()) { |
170 | 1 | file_size = idx_info.index_file_size(); |
171 | 1 | break; |
172 | 1 | } |
173 | 1 | } |
174 | 1 | } |
175 | 15 | file_size = file_size == 0 ? -1 : file_size; |
176 | 15 | DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { |
177 | 15 | if (file_size == -1) { |
178 | 15 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
179 | 15 | "CLuceneError occur file size = -1, file is {}", index_file_path)); |
180 | 15 | } |
181 | 15 | }) |
182 | 15 | DCHECK(_fs != nullptr) |
183 | 0 | << "file system is nullptr, index_file_path: " << index_file_path; |
184 | | // 2. open file |
185 | 15 | auto ok = DorisFSDirectory::FSIndexInput::open( |
186 | 15 | _fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size); |
187 | 15 | if (!ok) { |
188 | | // now index_input = nullptr |
189 | 1 | if (err.number() == CL_ERR_FileNotFound) { |
190 | 1 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
191 | 1 | "inverted index file {} is not found.", index_file_path)); |
192 | 1 | } |
193 | 0 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
194 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, |
195 | 0 | err.what())); |
196 | 1 | } |
197 | | |
198 | | // 3. read file in DorisCompoundReader |
199 | 14 | compound_reader.reset(new DorisCompoundReader(index_input, _read_buffer_size)); |
200 | 14 | } catch (CLuceneError& err) { |
201 | 0 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
202 | 0 | "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, |
203 | 0 | err.what())); |
204 | 0 | } |
205 | 4.74k | } else { |
206 | 4.74k | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
207 | 4.74k | if (_stream == nullptr) { |
208 | 1 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
209 | 1 | "CLuceneError occur when open idx file {}, stream is nullptr", |
210 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix))); |
211 | 1 | } |
212 | | |
213 | | // Check if the specified index exists |
214 | 4.74k | auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix)); |
215 | 4.74k | if (index_it == _indices_entries.end()) { |
216 | 4 | std::ostringstream errMsg; |
217 | 4 | errMsg << "No index with id " << index_id << " found"; |
218 | 4 | return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
219 | 4 | "CLuceneError occur when open idx file {}, error msg: {}", |
220 | 4 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix), |
221 | 4 | errMsg.str())); |
222 | 4 | } |
223 | | // Need to clone resource here, because index searcher cache need it. |
224 | 4.74k | compound_reader.reset(new DorisCompoundReader(_stream->clone(), *index_it->second, |
225 | 4.74k | _read_buffer_size, io_ctx)); |
226 | 4.74k | } |
227 | 4.75k | return compound_reader; |
228 | 4.76k | } |
229 | | |
230 | | Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::open( |
231 | 2.50k | const TabletIndex* index_meta, const io::IOContext* io_ctx) const { |
232 | 2.50k | auto index_id = index_meta->index_id(); |
233 | 2.50k | auto index_suffix = index_meta->get_index_suffix(); |
234 | 2.50k | return _open(index_id, index_suffix, io_ctx); |
235 | 2.50k | } |
236 | | |
237 | 331 | std::string IndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const { |
238 | 331 | return InvertedIndexDescriptor::get_index_file_cache_key( |
239 | 331 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
240 | 331 | } |
241 | | |
242 | 1.30k | std::string IndexFileReader::get_index_file_path(const TabletIndex* index_meta) const { |
243 | 1.30k | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
244 | 0 | return InvertedIndexDescriptor::get_index_file_path_v1( |
245 | 0 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
246 | 0 | } |
247 | 1.30k | return InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); |
248 | 1.30k | } |
249 | | |
250 | 1 | Status IndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const { |
251 | 1 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
252 | 0 | auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( |
253 | 0 | _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix()); |
254 | 0 | return _fs->exists(index_file_path, res); |
255 | 1 | } else { |
256 | 1 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
257 | 1 | if (_stream == nullptr) { |
258 | 1 | *res = false; |
259 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
260 | 1 | "idx file {} is not opened", |
261 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)); |
262 | 1 | } |
263 | | // Check if the specified index exists |
264 | 0 | auto index_it = _indices_entries.find( |
265 | 0 | std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); |
266 | 0 | if (index_it == _indices_entries.end()) { |
267 | 0 | *res = false; |
268 | 0 | } else { |
269 | 0 | *res = true; |
270 | 0 | } |
271 | 0 | } |
272 | 0 | return Status::OK(); |
273 | 1 | } |
274 | | |
275 | 5 | Status IndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const { |
276 | 5 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
277 | 1 | *res = true; |
278 | 1 | return Status::OK(); |
279 | 1 | } |
280 | 4 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
281 | 4 | if (_stream == nullptr) { |
282 | 1 | return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( |
283 | 1 | "idx file {} is not opened", |
284 | 1 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)); |
285 | 1 | } |
286 | | // Check if the specified index exists |
287 | 3 | auto index_it = _indices_entries.find( |
288 | 3 | std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); |
289 | 3 | if (index_it == _indices_entries.end()) { |
290 | 1 | *res = false; |
291 | 2 | } else { |
292 | 2 | const auto& entries = index_it->second; |
293 | 2 | auto entry_it = |
294 | 2 | entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name()); |
295 | 2 | if (entry_it == entries->end()) { |
296 | 0 | *res = false; |
297 | 0 | return Status::OK(); |
298 | 0 | } |
299 | 2 | const auto& e = entry_it->second; |
300 | | // roaring bitmap cookie header size is 5 |
301 | 2 | if (e->length <= 5) { |
302 | 1 | *res = false; |
303 | 1 | } else { |
304 | 1 | *res = true; |
305 | 1 | } |
306 | 2 | } |
307 | 3 | return Status::OK(); |
308 | 3 | } |
309 | | |
310 | 1 | void IndexFileReader::debug_file_entries() { |
311 | 1 | std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading |
312 | 1 | for (const auto& index : _indices_entries) { |
313 | 1 | LOG(INFO) << "index_id:" << index.first.first; |
314 | 1 | const auto& index_entries = index.second; |
315 | 2 | for (const auto& entry : *index_entries) { |
316 | 2 | const auto& file_entry = entry.second; |
317 | | LOG(INFO) << "file entry name:" << file_entry->file_name |
318 | 2 | << " length:" << file_entry->length << " offset:" << file_entry->offset; |
319 | 2 | } |
320 | 1 | } |
321 | 1 | } |
322 | | |
323 | | } // namespace doris::segment_v2 |