Coverage Report

Created: 2026-03-16 05:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/index_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/index_file_writer.h"
19
20
#include <glog/logging.h>
21
22
#include <atomic>
23
#include <filesystem>
24
25
#include "common/status.h"
26
#include "io/fs/packed_file_writer.h"
27
#include "io/fs/s3_file_writer.h"
28
#include "io/fs/stream_sink_file_writer.h"
29
#include "storage/index/ann/ann_index_files.h"
30
#include "storage/index/index_file_reader.h"
31
#include "storage/index/index_storage_format_v1.h"
32
#include "storage/index/index_storage_format_v2.h"
33
#include "storage/index/inverted/inverted_index_compound_reader.h"
34
#include "storage/index/inverted/inverted_index_desc.h"
35
#include "storage/index/inverted/inverted_index_fs_directory.h"
36
#include "storage/index/inverted/inverted_index_reader.h"
37
#include "storage/tablet/tablet_schema.h"
38
39
namespace doris::segment_v2 {
40
41
IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix,
42
                                 std::string rowset_id, int64_t seg_id,
43
                                 InvertedIndexStorageFormatPB storage_format,
44
                                 io::FileWriterPtr file_writer, bool can_use_ram_dir)
45
444
        : _fs(std::move(fs)),
46
444
          _index_path_prefix(std::move(index_path_prefix)),
47
444
          _rowset_id(std::move(rowset_id)),
48
444
          _seg_id(seg_id),
49
444
          _storage_format(storage_format),
50
444
          _local_fs(io::global_local_filesystem()),
51
444
          _idx_v2_writer(std::move(file_writer)),
52
444
          _can_use_ram_dir(can_use_ram_dir) {
53
444
    auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
54
444
    _tmp_dir = tmp_file_dir.native();
55
444
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
56
33
        _index_storage_format = std::make_unique<IndexStorageFormatV1>(this);
57
411
    } else {
58
411
        _index_storage_format = std::make_unique<IndexStorageFormatV2>(this);
59
411
    }
60
444
}
61
62
16
Status IndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) {
63
16
    _indices_dirs = std::move(indices_dirs);
64
16
    return Status::OK();
65
16
}
66
67
Status IndexFileWriter::_insert_directory_into_map(int64_t index_id,
68
                                                   const std::string& index_suffix,
69
2.75k
                                                   std::shared_ptr<DorisFSDirectory> dir) {
70
2.75k
    auto key = std::make_pair(index_id, index_suffix);
71
2.75k
    auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
72
2.75k
    if (!inserted) {
73
1
        LOG(ERROR) << "IndexFileWriter::open attempted to insert a duplicate key: (" << key.first
74
1
                   << ", " << key.second << ")";
75
1
        LOG(ERROR) << "Directories already in map: ";
76
1
        for (const auto& entry : _indices_dirs) {
77
1
            LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")";
78
1
        }
79
1
        return Status::InternalError("IndexFileWriter::open attempted to insert a duplicate dir");
80
1
    }
81
2.75k
    return Status::OK();
82
2.75k
}
83
84
2.74k
Result<std::shared_ptr<DorisFSDirectory>> IndexFileWriter::open(const TabletIndex* index_meta) {
85
2.74k
    auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path(
86
2.74k
            _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix());
87
2.74k
    auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
88
2.74k
            _local_fs, local_fs_index_path.c_str(), _can_use_ram_dir));
89
2.74k
    auto st =
90
2.74k
            _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir);
91
2.74k
    if (!st.ok()) {
92
0
        return ResultError(st);
93
0
    }
94
95
2.74k
    return dir;
96
2.74k
}
97
98
6
Status IndexFileWriter::delete_index(const TabletIndex* index_meta) {
99
6
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_index_meta_nullptr", { index_meta = nullptr; });
100
6
    if (!index_meta) {
101
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null.");
102
1
    }
103
104
5
    auto index_id = index_meta->index_id();
105
5
    const auto& index_suffix = index_meta->get_index_suffix();
106
107
    // Check if the specified index exists
108
5
    auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix));
109
5
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_indices_dirs_reach_end",
110
5
                    { index_it = _indices_dirs.end(); })
111
5
    if (index_it == _indices_dirs.end()) {
112
1
        std::ostringstream errMsg;
113
1
        errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix
114
1
               << " found.";
115
1
        LOG(WARNING) << errMsg.str();
116
1
        return Status::OK();
117
1
    }
118
119
4
    _indices_dirs.erase(index_it);
120
4
    return Status::OK();
121
5
}
122
123
8
Status IndexFileWriter::add_into_searcher_cache() {
124
8
    auto index_file_reader =
125
8
            std::make_unique<IndexFileReader>(_fs, _index_path_prefix, _storage_format);
126
8
    auto st = index_file_reader->init();
127
8
    if (!st.ok()) {
128
1
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr) {
129
            // StreamSinkFileWriter not found file is normal.
130
0
            return Status::OK();
131
0
        }
132
1
        if (dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
133
            // PackedFileWriter: file may be merged, skip cache for now.
134
            // The cache will be populated on first read.
135
0
            return Status::OK();
136
0
        }
137
1
        LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for " << _index_path_prefix
138
1
                     << ", error " << st.msg();
139
1
        return st;
140
1
    }
141
7
    for (const auto& entry : _indices_dirs) {
142
7
        auto index_meta = entry.first;
143
7
        auto dir = DORIS_TRY(index_file_reader->_open(index_meta.first, index_meta.second));
144
7
        std::vector<std::string> file_names;
145
7
        dir->list(&file_names);
146
7
        if (file_names.size() == 1 && (file_names[0] == faiss_index_fila_name)) {
147
0
            continue;
148
0
        }
149
7
        auto index_file_key = InvertedIndexDescriptor::get_index_file_cache_key(
150
7
                _index_path_prefix, index_meta.first, index_meta.second);
151
7
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
152
7
        InvertedIndexCacheHandle inverted_index_cache_handle;
153
7
        if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
154
7
                                                           &inverted_index_cache_handle)) {
155
1
            st = InvertedIndexSearcherCache::instance()->erase(searcher_cache_key.index_file_path);
156
1
            if (!st.ok()) {
157
0
                LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for "
158
0
                             << _index_path_prefix << ", error " << st.msg();
159
0
            }
160
1
        }
161
7
        IndexSearcherPtr searcher;
162
7
        size_t reader_size = 0;
163
7
        auto index_searcher_builder = DORIS_TRY(_construct_index_searcher_builder(dir.get()));
164
7
        RETURN_IF_ERROR(InvertedIndexReader::create_index_searcher(
165
7
                index_searcher_builder.get(), dir.get(), &searcher, reader_size));
166
7
        auto* cache_value = new InvertedIndexSearcherCache::CacheValue(std::move(searcher),
167
7
                                                                       reader_size, UnixMillis());
168
7
        InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value);
169
7
    }
170
7
    return Status::OK();
171
7
}
172
173
Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_searcher_builder(
174
0
        const DorisCompoundReader* dir) {
175
0
    std::vector<std::string> files;
176
0
    dir->list(&files);
177
0
    auto reader_type = InvertedIndexReaderType::FULLTEXT;
178
0
    bool found_bkd = std::any_of(files.begin(), files.end(), [](const std::string& file) {
179
0
        return file == InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name();
180
0
    });
181
0
    if (found_bkd) {
182
0
        reader_type = InvertedIndexReaderType::BKD;
183
0
    }
184
0
    return IndexSearcherBuilder::create_index_searcher_builder(reader_type);
185
0
}
186
187
396
Status IndexFileWriter::begin_close() {
188
396
    DCHECK(!_closed) << debug_string();
189
396
    _closed = true;
190
396
    if (_indices_dirs.empty()) {
191
        // An empty file must still be created even if there are no indexes to write
192
9
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
193
9
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
194
9
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
195
2
            return _idx_v2_writer->close(true);
196
2
        }
197
7
        return Status::OK();
198
9
    }
199
387
    DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", {
200
387
        if (_storage_format != InvertedIndexStorageFormatPB::V2) {
201
387
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
202
387
                    "IndexFileWriter::close fault injection:inverted index storage format "
203
387
                    "must be v2");
204
387
        }
205
387
    })
206
387
    try {
207
387
        RETURN_IF_ERROR(_index_storage_format->write());
208
2.72k
        for (const auto& entry : _indices_dirs) {
209
2.72k
            const auto& dir = entry.second;
210
            // delete index path, which contains separated inverted index files
211
2.72k
            if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) {
212
90
                auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get());
213
90
                compound_dir->deleteDirectory();
214
90
            }
215
2.72k
        }
216
380
    } catch (CLuceneError& err) {
217
0
        if (_storage_format == InvertedIndexStorageFormatPB::V1) {
218
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
219
0
                    "CLuceneError occur when close, error msg: {}", err.what());
220
0
        } else {
221
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
222
0
                    "CLuceneError occur when close idx file {}, error msg: {}",
223
0
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
224
0
                    err.what());
225
0
        }
226
0
    }
227
380
    return Status::OK();
228
387
}
229
230
381
Status IndexFileWriter::finish_close() {
231
381
    DCHECK(_closed) << debug_string();
232
381
    if (_indices_dirs.empty()) {
233
        // An empty file must still be created even if there are no indexes to write
234
8
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
235
8
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
236
8
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
237
2
            return _idx_v2_writer->close(false);
238
2
        }
239
6
        return Status::OK();
240
8
    }
241
373
    if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) {
242
352
        RETURN_IF_ERROR(_idx_v2_writer->close(false));
243
352
    }
244
373
    LOG_INFO("IndexFileWriter finish_close, enable_write_index_searcher_cache: {}",
245
373
             config::enable_write_index_searcher_cache);
246
373
    Status st = Status::OK();
247
373
    if (config::enable_write_index_searcher_cache) {
248
7
        st = add_into_searcher_cache();
249
7
    }
250
373
    _indices_dirs.clear();
251
373
    return st;
252
373
}
253
254
3
std::vector<std::string> IndexFileWriter::get_index_file_names() const {
255
3
    std::vector<std::string> file_names;
256
3
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
257
2
        if (_closed && _file_info.index_info_size() > 0) {
258
1
            for (const auto& index_info : _file_info.index_info()) {
259
1
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
260
1
                        _rowset_id, _seg_id, index_info.index_id(), index_info.index_suffix()));
261
1
            }
262
1
        } else {
263
2
            for (const auto& [index_info, _] : _indices_dirs) {
264
2
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
265
2
                        _rowset_id, _seg_id, index_info.first, index_info.second));
266
2
            }
267
1
        }
268
2
    } else {
269
1
        file_names.emplace_back(
270
1
                InvertedIndexDescriptor::get_index_file_name_v2(_rowset_id, _seg_id));
271
1
    }
272
3
    return file_names;
273
3
}
274
275
0
std::string IndexFileWriter::debug_string() const {
276
0
    std::stringstream indices_dirs;
277
0
    for (const auto& [index, dir] : _indices_dirs) {
278
0
        indices_dirs << "index id is: " << index.first << " , index suffix is: " << index.second
279
0
                     << " , index dir is: " << dir->toString();
280
0
    }
281
0
    return fmt::format(
282
0
            "inverted index file writer debug string: index storage format is: {}, index path "
283
0
            "prefix is: {}, rowset id is: {}, seg id is: {}, closed is: {}, total file size "
284
0
            "is: {}, index dirs is: {}",
285
0
            _storage_format, _index_path_prefix, _rowset_id, _seg_id, _closed, _total_file_size,
286
0
            indices_dirs.str());
287
0
}
288
289
} // namespace doris::segment_v2