Coverage Report

Created: 2026-03-31 00:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/index_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/index_file_writer.h"
19
20
#include <glog/logging.h>
21
22
#include <atomic>
23
#include <filesystem>
24
25
#include "common/status.h"
26
#include "io/fs/packed_file_writer.h"
27
#include "io/fs/s3_file_writer.h"
28
#include "io/fs/stream_sink_file_writer.h"
29
#include "storage/index/ann/ann_index_files.h"
30
#include "storage/index/index_file_reader.h"
31
#include "storage/index/index_storage_format_v1.h"
32
#include "storage/index/index_storage_format_v2.h"
33
#include "storage/index/inverted/inverted_index_compound_reader.h"
34
#include "storage/index/inverted/inverted_index_desc.h"
35
#include "storage/index/inverted/inverted_index_fs_directory.h"
36
#include "storage/index/inverted/inverted_index_reader.h"
37
#include "storage/tablet/tablet_schema.h"
38
39
namespace doris::segment_v2 {
40
41
IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix,
42
                                 std::string rowset_id, int64_t seg_id,
43
                                 InvertedIndexStorageFormatPB storage_format,
44
                                 io::FileWriterPtr file_writer, bool can_use_ram_dir,
45
                                 int64_t tablet_id)
46
452
        : _fs(std::move(fs)),
47
452
          _index_path_prefix(std::move(index_path_prefix)),
48
452
          _rowset_id(std::move(rowset_id)),
49
452
          _seg_id(seg_id),
50
452
          _storage_format(storage_format),
51
452
          _local_fs(io::global_local_filesystem()),
52
452
          _idx_v2_writer(std::move(file_writer)),
53
452
          _can_use_ram_dir(can_use_ram_dir),
54
452
          _tablet_id(tablet_id) {
55
452
    auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
56
452
    _tmp_dir = tmp_file_dir.native();
57
452
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
58
33
        _index_storage_format = std::make_unique<IndexStorageFormatV1>(this);
59
419
    } else {
60
419
        _index_storage_format = std::make_unique<IndexStorageFormatV2>(this);
61
419
    }
62
452
}
63
64
16
Status IndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) {
65
16
    _indices_dirs = std::move(indices_dirs);
66
16
    return Status::OK();
67
16
}
68
69
Status IndexFileWriter::_insert_directory_into_map(int64_t index_id,
70
                                                   const std::string& index_suffix,
71
2.75k
                                                   std::shared_ptr<DorisFSDirectory> dir) {
72
2.75k
    auto key = std::make_pair(index_id, index_suffix);
73
2.75k
    auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
74
2.75k
    if (!inserted) {
75
1
        LOG(ERROR) << "IndexFileWriter::open attempted to insert a duplicate key: (" << key.first
76
1
                   << ", " << key.second << ")";
77
1
        LOG(ERROR) << "Directories already in map: ";
78
1
        for (const auto& entry : _indices_dirs) {
79
1
            LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")";
80
1
        }
81
1
        return Status::InternalError("IndexFileWriter::open attempted to insert a duplicate dir");
82
1
    }
83
2.75k
    return Status::OK();
84
2.75k
}
85
86
2.74k
Result<std::shared_ptr<DorisFSDirectory>> IndexFileWriter::open(const TabletIndex* index_meta) {
87
2.74k
    auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path(
88
2.74k
            _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix());
89
2.74k
    auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
90
2.74k
            _local_fs, local_fs_index_path.c_str(), _can_use_ram_dir));
91
2.74k
    auto st =
92
2.74k
            _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir);
93
2.74k
    if (!st.ok()) {
94
0
        return ResultError(st);
95
0
    }
96
97
2.74k
    return dir;
98
2.74k
}
99
100
7
Status IndexFileWriter::delete_index(const TabletIndex* index_meta) {
101
7
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_index_meta_nullptr", { index_meta = nullptr; });
102
7
    if (!index_meta) {
103
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null.");
104
1
    }
105
106
6
    auto index_id = index_meta->index_id();
107
6
    const auto& index_suffix = index_meta->get_index_suffix();
108
109
    // Check if the specified index exists
110
6
    auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix));
111
6
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_indices_dirs_reach_end",
112
6
                    { index_it = _indices_dirs.end(); })
113
6
    if (index_it == _indices_dirs.end()) {
114
2
        std::ostringstream errMsg;
115
2
        errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix
116
2
               << " found.";
117
2
        LOG(WARNING) << errMsg.str();
118
2
        return Status::OK();
119
2
    }
120
121
4
    _indices_dirs.erase(index_it);
122
4
    return Status::OK();
123
6
}
124
125
8
Status IndexFileWriter::add_into_searcher_cache() {
126
8
    auto index_file_reader = std::make_unique<IndexFileReader>(
127
8
            _fs, _index_path_prefix, _storage_format, InvertedIndexFileInfo(), _tablet_id);
128
8
    auto st = index_file_reader->init();
129
8
    if (!st.ok()) {
130
1
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr) {
131
            // StreamSinkFileWriter not found file is normal.
132
0
            return Status::OK();
133
0
        }
134
1
        if (dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
135
            // PackedFileWriter: file may be merged, skip cache for now.
136
            // The cache will be populated on first read.
137
0
            return Status::OK();
138
0
        }
139
1
        LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for " << _index_path_prefix
140
1
                     << ", error " << st.msg();
141
1
        return st;
142
1
    }
143
7
    for (const auto& entry : _indices_dirs) {
144
7
        auto index_meta = entry.first;
145
7
        auto dir = DORIS_TRY(index_file_reader->_open(index_meta.first, index_meta.second));
146
7
        std::vector<std::string> file_names;
147
7
        dir->list(&file_names);
148
7
        if (file_names.size() == 1 && (file_names[0] == faiss_index_fila_name)) {
149
0
            continue;
150
0
        }
151
7
        auto index_file_key = InvertedIndexDescriptor::get_index_file_cache_key(
152
7
                _index_path_prefix, index_meta.first, index_meta.second);
153
7
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
154
7
        InvertedIndexCacheHandle inverted_index_cache_handle;
155
7
        if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
156
7
                                                           &inverted_index_cache_handle)) {
157
1
            st = InvertedIndexSearcherCache::instance()->erase(searcher_cache_key.index_file_path);
158
1
            if (!st.ok()) {
159
0
                LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for "
160
0
                             << _index_path_prefix << ", error " << st.msg();
161
0
            }
162
1
        }
163
7
        IndexSearcherPtr searcher;
164
7
        size_t reader_size = 0;
165
7
        auto index_searcher_builder = DORIS_TRY(_construct_index_searcher_builder(dir.get()));
166
7
        RETURN_IF_ERROR(InvertedIndexReader::create_index_searcher(
167
7
                index_searcher_builder.get(), dir.get(), &searcher, reader_size));
168
7
        auto* cache_value = new InvertedIndexSearcherCache::CacheValue(std::move(searcher),
169
7
                                                                       reader_size, UnixMillis());
170
7
        InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value);
171
7
    }
172
7
    return Status::OK();
173
7
}
174
175
Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_searcher_builder(
176
0
        const DorisCompoundReader* dir) {
177
0
    std::vector<std::string> files;
178
0
    dir->list(&files);
179
0
    auto reader_type = InvertedIndexReaderType::FULLTEXT;
180
0
    bool found_bkd = std::any_of(files.begin(), files.end(), [](const std::string& file) {
181
0
        return file == InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name();
182
0
    });
183
0
    if (found_bkd) {
184
0
        reader_type = InvertedIndexReaderType::BKD;
185
0
    }
186
0
    return IndexSearcherBuilder::create_index_searcher_builder(reader_type);
187
0
}
188
189
397
Status IndexFileWriter::begin_close() {
190
397
    DCHECK(!_closed) << debug_string();
191
397
    _closed = true;
192
397
    if (_indices_dirs.empty()) {
193
        // An empty file must still be created even if there are no indexes to write
194
9
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
195
9
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
196
9
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
197
2
            return _idx_v2_writer->close(true);
198
2
        }
199
7
        return Status::OK();
200
9
    }
201
388
    DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", {
202
388
        if (_storage_format != InvertedIndexStorageFormatPB::V2) {
203
388
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
204
388
                    "IndexFileWriter::close fault injection:inverted index storage format "
205
388
                    "must be v2");
206
388
        }
207
388
    })
208
388
    try {
209
388
        RETURN_IF_ERROR(_index_storage_format->write());
210
2.72k
        for (const auto& entry : _indices_dirs) {
211
2.72k
            const auto& dir = entry.second;
212
            // delete index path, which contains separated inverted index files
213
2.72k
            if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) {
214
90
                auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get());
215
90
                compound_dir->deleteDirectory();
216
90
            }
217
2.72k
        }
218
381
    } catch (CLuceneError& err) {
219
0
        if (_storage_format == InvertedIndexStorageFormatPB::V1) {
220
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
221
0
                    "CLuceneError occur when close, error msg: {}", err.what());
222
0
        } else {
223
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
224
0
                    "CLuceneError occur when close idx file {}, error msg: {}",
225
0
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
226
0
                    err.what());
227
0
        }
228
0
    }
229
381
    return Status::OK();
230
388
}
231
232
382
Status IndexFileWriter::finish_close() {
233
382
    DCHECK(_closed) << debug_string();
234
382
    if (_indices_dirs.empty()) {
235
        // An empty file must still be created even if there are no indexes to write
236
8
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
237
8
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
238
8
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
239
2
            return _idx_v2_writer->close(false);
240
2
        }
241
6
        return Status::OK();
242
8
    }
243
374
    if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) {
244
353
        RETURN_IF_ERROR(_idx_v2_writer->close(false));
245
353
    }
246
247
374
    Status st = Status::OK();
248
374
    if (config::enable_write_index_searcher_cache) {
249
7
        st = add_into_searcher_cache();
250
7
    }
251
374
    _indices_dirs.clear();
252
374
    return st;
253
374
}
254
255
3
std::vector<std::string> IndexFileWriter::get_index_file_names() const {
256
3
    std::vector<std::string> file_names;
257
3
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
258
2
        if (_closed && _file_info.index_info_size() > 0) {
259
1
            for (const auto& index_info : _file_info.index_info()) {
260
1
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
261
1
                        _rowset_id, _seg_id, index_info.index_id(), index_info.index_suffix()));
262
1
            }
263
1
        } else {
264
2
            for (const auto& [index_info, _] : _indices_dirs) {
265
2
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
266
2
                        _rowset_id, _seg_id, index_info.first, index_info.second));
267
2
            }
268
1
        }
269
2
    } else {
270
1
        file_names.emplace_back(
271
1
                InvertedIndexDescriptor::get_index_file_name_v2(_rowset_id, _seg_id));
272
1
    }
273
3
    return file_names;
274
3
}
275
276
0
std::string IndexFileWriter::debug_string() const {
277
0
    std::stringstream indices_dirs;
278
0
    for (const auto& [index, dir] : _indices_dirs) {
279
0
        indices_dirs << "index id is: " << index.first << " , index suffix is: " << index.second
280
0
                     << " , index dir is: " << dir->toString();
281
0
    }
282
0
    return fmt::format(
283
0
            "inverted index file writer debug string: index storage format is: {}, index path "
284
0
            "prefix is: {}, rowset id is: {}, seg id is: {}, closed is: {}, total file size "
285
0
            "is: {}, index dirs is: {}",
286
0
            _storage_format, _index_path_prefix, _rowset_id, _seg_id, _closed, _total_file_size,
287
0
            indices_dirs.str());
288
0
}
289
290
} // namespace doris::segment_v2