be/src/storage/index/index_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/index/index_file_writer.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <filesystem> |
24 | | |
25 | | #include "common/status.h" |
26 | | #include "io/fs/packed_file_writer.h" |
27 | | #include "io/fs/s3_file_writer.h" |
28 | | #include "io/fs/stream_sink_file_writer.h" |
29 | | #include "storage/index/ann/ann_index_files.h" |
30 | | #include "storage/index/index_file_reader.h" |
31 | | #include "storage/index/index_storage_format_v1.h" |
32 | | #include "storage/index/index_storage_format_v2.h" |
33 | | #include "storage/index/inverted/inverted_index_compound_reader.h" |
34 | | #include "storage/index/inverted/inverted_index_desc.h" |
35 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
36 | | #include "storage/index/inverted/inverted_index_reader.h" |
37 | | #include "storage/tablet/tablet_schema.h" |
38 | | |
39 | | namespace doris::segment_v2 { |
40 | | |
41 | | IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix, |
42 | | std::string rowset_id, int64_t seg_id, |
43 | | InvertedIndexStorageFormatPB storage_format, |
44 | | io::FileWriterPtr file_writer, bool can_use_ram_dir) |
45 | 444 | : _fs(std::move(fs)), |
46 | 444 | _index_path_prefix(std::move(index_path_prefix)), |
47 | 444 | _rowset_id(std::move(rowset_id)), |
48 | 444 | _seg_id(seg_id), |
49 | 444 | _storage_format(storage_format), |
50 | 444 | _local_fs(io::global_local_filesystem()), |
51 | 444 | _idx_v2_writer(std::move(file_writer)), |
52 | 444 | _can_use_ram_dir(can_use_ram_dir) { |
53 | 444 | auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); |
54 | 444 | _tmp_dir = tmp_file_dir.native(); |
55 | 444 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
56 | 33 | _index_storage_format = std::make_unique<IndexStorageFormatV1>(this); |
57 | 411 | } else { |
58 | 411 | _index_storage_format = std::make_unique<IndexStorageFormatV2>(this); |
59 | 411 | } |
60 | 444 | } |
61 | | |
62 | 16 | Status IndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { |
63 | 16 | _indices_dirs = std::move(indices_dirs); |
64 | 16 | return Status::OK(); |
65 | 16 | } |
66 | | |
67 | | Status IndexFileWriter::_insert_directory_into_map(int64_t index_id, |
68 | | const std::string& index_suffix, |
69 | 2.75k | std::shared_ptr<DorisFSDirectory> dir) { |
70 | 2.75k | auto key = std::make_pair(index_id, index_suffix); |
71 | 2.75k | auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir)); |
72 | 2.75k | if (!inserted) { |
73 | 1 | LOG(ERROR) << "IndexFileWriter::open attempted to insert a duplicate key: (" << key.first |
74 | 1 | << ", " << key.second << ")"; |
75 | 1 | LOG(ERROR) << "Directories already in map: "; |
76 | 1 | for (const auto& entry : _indices_dirs) { |
77 | 1 | LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")"; |
78 | 1 | } |
79 | 1 | return Status::InternalError("IndexFileWriter::open attempted to insert a duplicate dir"); |
80 | 1 | } |
81 | 2.75k | return Status::OK(); |
82 | 2.75k | } |
83 | | |
84 | 2.74k | Result<std::shared_ptr<DorisFSDirectory>> IndexFileWriter::open(const TabletIndex* index_meta) { |
85 | 2.74k | auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( |
86 | 2.74k | _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix()); |
87 | 2.74k | auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory( |
88 | 2.74k | _local_fs, local_fs_index_path.c_str(), _can_use_ram_dir)); |
89 | 2.74k | auto st = |
90 | 2.74k | _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir); |
91 | 2.74k | if (!st.ok()) { |
92 | 0 | return ResultError(st); |
93 | 0 | } |
94 | | |
95 | 2.74k | return dir; |
96 | 2.74k | } |
97 | | |
98 | 6 | Status IndexFileWriter::delete_index(const TabletIndex* index_meta) { |
99 | 6 | DBUG_EXECUTE_IF("IndexFileWriter::delete_index_index_meta_nullptr", { index_meta = nullptr; }); |
100 | 6 | if (!index_meta) { |
101 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); |
102 | 1 | } |
103 | | |
104 | 5 | auto index_id = index_meta->index_id(); |
105 | 5 | const auto& index_suffix = index_meta->get_index_suffix(); |
106 | | |
107 | | // Check if the specified index exists |
108 | 5 | auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); |
109 | 5 | DBUG_EXECUTE_IF("IndexFileWriter::delete_index_indices_dirs_reach_end", |
110 | 5 | { index_it = _indices_dirs.end(); }) |
111 | 5 | if (index_it == _indices_dirs.end()) { |
112 | 1 | std::ostringstream errMsg; |
113 | 1 | errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix |
114 | 1 | << " found."; |
115 | 1 | LOG(WARNING) << errMsg.str(); |
116 | 1 | return Status::OK(); |
117 | 1 | } |
118 | | |
119 | 4 | _indices_dirs.erase(index_it); |
120 | 4 | return Status::OK(); |
121 | 5 | } |
122 | | |
123 | 8 | Status IndexFileWriter::add_into_searcher_cache() { |
124 | 8 | auto index_file_reader = |
125 | 8 | std::make_unique<IndexFileReader>(_fs, _index_path_prefix, _storage_format); |
126 | 8 | auto st = index_file_reader->init(); |
127 | 8 | if (!st.ok()) { |
128 | 1 | if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr) { |
129 | | // StreamSinkFileWriter not found file is normal. |
130 | 0 | return Status::OK(); |
131 | 0 | } |
132 | 1 | if (dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) { |
133 | | // PackedFileWriter: file may be merged, skip cache for now. |
134 | | // The cache will be populated on first read. |
135 | 0 | return Status::OK(); |
136 | 0 | } |
137 | 1 | LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for " << _index_path_prefix |
138 | 1 | << ", error " << st.msg(); |
139 | 1 | return st; |
140 | 1 | } |
141 | 7 | for (const auto& entry : _indices_dirs) { |
142 | 7 | auto index_meta = entry.first; |
143 | 7 | auto dir = DORIS_TRY(index_file_reader->_open(index_meta.first, index_meta.second)); |
144 | 7 | std::vector<std::string> file_names; |
145 | 7 | dir->list(&file_names); |
146 | 7 | if (file_names.size() == 1 && (file_names[0] == faiss_index_fila_name)) { |
147 | 0 | continue; |
148 | 0 | } |
149 | 7 | auto index_file_key = InvertedIndexDescriptor::get_index_file_cache_key( |
150 | 7 | _index_path_prefix, index_meta.first, index_meta.second); |
151 | 7 | InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key); |
152 | 7 | InvertedIndexCacheHandle inverted_index_cache_handle; |
153 | 7 | if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key, |
154 | 7 | &inverted_index_cache_handle)) { |
155 | 1 | st = InvertedIndexSearcherCache::instance()->erase(searcher_cache_key.index_file_path); |
156 | 1 | if (!st.ok()) { |
157 | 0 | LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for " |
158 | 0 | << _index_path_prefix << ", error " << st.msg(); |
159 | 0 | } |
160 | 1 | } |
161 | 7 | IndexSearcherPtr searcher; |
162 | 7 | size_t reader_size = 0; |
163 | 7 | auto index_searcher_builder = DORIS_TRY(_construct_index_searcher_builder(dir.get())); |
164 | 7 | RETURN_IF_ERROR(InvertedIndexReader::create_index_searcher( |
165 | 7 | index_searcher_builder.get(), dir.get(), &searcher, reader_size)); |
166 | 7 | auto* cache_value = new InvertedIndexSearcherCache::CacheValue(std::move(searcher), |
167 | 7 | reader_size, UnixMillis()); |
168 | 7 | InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value); |
169 | 7 | } |
170 | 7 | return Status::OK(); |
171 | 7 | } |
172 | | |
173 | | Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_searcher_builder( |
174 | 0 | const DorisCompoundReader* dir) { |
175 | 0 | std::vector<std::string> files; |
176 | 0 | dir->list(&files); |
177 | 0 | auto reader_type = InvertedIndexReaderType::FULLTEXT; |
178 | 0 | bool found_bkd = std::any_of(files.begin(), files.end(), [](const std::string& file) { |
179 | 0 | return file == InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name(); |
180 | 0 | }); |
181 | 0 | if (found_bkd) { |
182 | 0 | reader_type = InvertedIndexReaderType::BKD; |
183 | 0 | } |
184 | 0 | return IndexSearcherBuilder::create_index_searcher_builder(reader_type); |
185 | 0 | } |
186 | | |
187 | 396 | Status IndexFileWriter::begin_close() { |
188 | 396 | DCHECK(!_closed) << debug_string(); |
189 | 396 | _closed = true; |
190 | 396 | if (_indices_dirs.empty()) { |
191 | | // An empty file must still be created even if there are no indexes to write |
192 | 9 | if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr || |
193 | 9 | dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr || |
194 | 9 | dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) { |
195 | 2 | return _idx_v2_writer->close(true); |
196 | 2 | } |
197 | 7 | return Status::OK(); |
198 | 9 | } |
199 | 387 | DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", { |
200 | 387 | if (_storage_format != InvertedIndexStorageFormatPB::V2) { |
201 | 387 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
202 | 387 | "IndexFileWriter::close fault injection:inverted index storage format " |
203 | 387 | "must be v2"); |
204 | 387 | } |
205 | 387 | }) |
206 | 387 | try { |
207 | 387 | RETURN_IF_ERROR(_index_storage_format->write()); |
208 | 2.72k | for (const auto& entry : _indices_dirs) { |
209 | 2.72k | const auto& dir = entry.second; |
210 | | // delete index path, which contains separated inverted index files |
211 | 2.72k | if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) { |
212 | 90 | auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get()); |
213 | 90 | compound_dir->deleteDirectory(); |
214 | 90 | } |
215 | 2.72k | } |
216 | 380 | } catch (CLuceneError& err) { |
217 | 0 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
218 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
219 | 0 | "CLuceneError occur when close, error msg: {}", err.what()); |
220 | 0 | } else { |
221 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
222 | 0 | "CLuceneError occur when close idx file {}, error msg: {}", |
223 | 0 | InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix), |
224 | 0 | err.what()); |
225 | 0 | } |
226 | 0 | } |
227 | 380 | return Status::OK(); |
228 | 387 | } |
229 | | |
230 | 381 | Status IndexFileWriter::finish_close() { |
231 | 381 | DCHECK(_closed) << debug_string(); |
232 | 381 | if (_indices_dirs.empty()) { |
233 | | // An empty file must still be created even if there are no indexes to write |
234 | 8 | if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr || |
235 | 8 | dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr || |
236 | 8 | dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) { |
237 | 2 | return _idx_v2_writer->close(false); |
238 | 2 | } |
239 | 6 | return Status::OK(); |
240 | 8 | } |
241 | 373 | if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) { |
242 | 352 | RETURN_IF_ERROR(_idx_v2_writer->close(false)); |
243 | 352 | } |
244 | 373 | LOG_INFO("IndexFileWriter finish_close, enable_write_index_searcher_cache: {}", |
245 | 373 | config::enable_write_index_searcher_cache); |
246 | 373 | Status st = Status::OK(); |
247 | 373 | if (config::enable_write_index_searcher_cache) { |
248 | 7 | st = add_into_searcher_cache(); |
249 | 7 | } |
250 | 373 | _indices_dirs.clear(); |
251 | 373 | return st; |
252 | 373 | } |
253 | | |
254 | 3 | std::vector<std::string> IndexFileWriter::get_index_file_names() const { |
255 | 3 | std::vector<std::string> file_names; |
256 | 3 | if (_storage_format == InvertedIndexStorageFormatPB::V1) { |
257 | 2 | if (_closed && _file_info.index_info_size() > 0) { |
258 | 1 | for (const auto& index_info : _file_info.index_info()) { |
259 | 1 | file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1( |
260 | 1 | _rowset_id, _seg_id, index_info.index_id(), index_info.index_suffix())); |
261 | 1 | } |
262 | 1 | } else { |
263 | 2 | for (const auto& [index_info, _] : _indices_dirs) { |
264 | 2 | file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1( |
265 | 2 | _rowset_id, _seg_id, index_info.first, index_info.second)); |
266 | 2 | } |
267 | 1 | } |
268 | 2 | } else { |
269 | 1 | file_names.emplace_back( |
270 | 1 | InvertedIndexDescriptor::get_index_file_name_v2(_rowset_id, _seg_id)); |
271 | 1 | } |
272 | 3 | return file_names; |
273 | 3 | } |
274 | | |
275 | 0 | std::string IndexFileWriter::debug_string() const { |
276 | 0 | std::stringstream indices_dirs; |
277 | 0 | for (const auto& [index, dir] : _indices_dirs) { |
278 | 0 | indices_dirs << "index id is: " << index.first << " , index suffix is: " << index.second |
279 | 0 | << " , index dir is: " << dir->toString(); |
280 | 0 | } |
281 | 0 | return fmt::format( |
282 | 0 | "inverted index file writer debug string: index storage format is: {}, index path " |
283 | 0 | "prefix is: {}, rowset id is: {}, seg id is: {}, closed is: {}, total file size " |
284 | 0 | "is: {}, index dirs is: {}", |
285 | 0 | _storage_format, _index_path_prefix, _rowset_id, _seg_id, _closed, _total_file_size, |
286 | 0 | indices_dirs.str()); |
287 | 0 | } |
288 | | |
289 | | } // namespace doris::segment_v2 |