Coverage Report

Created: 2026-07-02 14:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/index_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/index_file_writer.h"
19
20
#include <glog/logging.h>
21
22
#include <atomic>
23
#include <filesystem>
24
25
#include "common/status.h"
26
#include "io/fs/packed_file_writer.h"
27
#include "io/fs/s3_file_writer.h"
28
#include "io/fs/stream_sink_file_writer.h"
29
#include "storage/index/ann/ann_index_files.h"
30
#include "storage/index/index_file_reader.h"
31
#include "storage/index/index_storage_format_v1.h"
32
#include "storage/index/index_storage_format_v2.h"
33
#include "storage/index/inverted/inverted_index_compound_reader.h"
34
#include "storage/index/inverted/inverted_index_desc.h"
35
#include "storage/index/inverted/inverted_index_fs_directory.h"
36
#include "storage/index/inverted/inverted_index_reader.h"
37
#include "storage/tablet/tablet_schema.h"
38
39
namespace doris::segment_v2 {
40
41
IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix,
42
                                 std::string rowset_id, int64_t seg_id,
43
                                 InvertedIndexStorageFormatPB storage_format,
44
                                 io::FileWriterPtr file_writer, bool can_use_ram_dir,
45
                                 int64_t tablet_id)
46
1.06k
        : _fs(std::move(fs)),
47
1.06k
          _index_path_prefix(std::move(index_path_prefix)),
48
1.06k
          _rowset_id(std::move(rowset_id)),
49
1.06k
          _seg_id(seg_id),
50
1.06k
          _storage_format(storage_format),
51
1.06k
          _local_fs(io::global_local_filesystem()),
52
1.06k
          _idx_v2_writer(std::move(file_writer)),
53
1.06k
          _can_use_ram_dir(can_use_ram_dir),
54
1.06k
          _tablet_id(tablet_id) {
55
1.06k
    auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
56
1.06k
    _tmp_dir = tmp_file_dir.native();
57
1.06k
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
58
70
        _index_storage_format = std::make_unique<IndexStorageFormatV1>(this);
59
992
    } else {
60
992
        _index_storage_format = std::make_unique<IndexStorageFormatV2>(this);
61
992
    }
62
1.06k
}
63
64
40
Status IndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) {
65
40
    _indices_dirs = std::move(indices_dirs);
66
40
    return Status::OK();
67
40
}
68
69
Status IndexFileWriter::_insert_directory_into_map(int64_t index_id,
70
                                                   const std::string& index_suffix,
71
5.70k
                                                   std::shared_ptr<DorisFSDirectory> dir) {
72
5.70k
    auto key = std::make_pair(index_id, index_suffix);
73
5.70k
    auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
74
5.70k
    if (!inserted) {
75
2
        LOG(ERROR) << "IndexFileWriter::open attempted to insert a duplicate key: (" << key.first
76
2
                   << ", " << key.second << ")";
77
2
        LOG(ERROR) << "Directories already in map: ";
78
2
        for (const auto& entry : _indices_dirs) {
79
2
            LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")";
80
2
        }
81
2
        return Status::InternalError("IndexFileWriter::open attempted to insert a duplicate dir");
82
2
    }
83
5.70k
    return Status::OK();
84
5.70k
}
85
86
5.68k
Result<std::shared_ptr<DorisFSDirectory>> IndexFileWriter::open(const TabletIndex* index_meta) {
87
5.68k
    auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path(
88
5.68k
            _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix());
89
5.68k
    auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
90
5.68k
            _local_fs, local_fs_index_path.c_str(), _can_use_ram_dir));
91
5.68k
    auto st =
92
5.68k
            _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir);
93
5.68k
    if (!st.ok()) {
94
0
        return ResultError(st);
95
0
    }
96
97
5.68k
    return dir;
98
5.68k
}
99
100
22
Status IndexFileWriter::delete_index(const TabletIndex* index_meta) {
101
22
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_index_meta_nullptr", { index_meta = nullptr; });
102
22
    if (!index_meta) {
103
2
        return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null.");
104
2
    }
105
106
20
    auto index_id = index_meta->index_id();
107
20
    const auto& index_suffix = index_meta->get_index_suffix();
108
109
    // Check if the specified index exists
110
20
    auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix));
111
20
    DBUG_EXECUTE_IF("IndexFileWriter::delete_index_indices_dirs_reach_end",
112
20
                    { index_it = _indices_dirs.end(); })
113
20
    if (index_it == _indices_dirs.end()) {
114
10
        std::ostringstream errMsg;
115
10
        errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix
116
10
               << " found.";
117
10
        LOG(WARNING) << errMsg.str();
118
10
        return Status::OK();
119
10
    }
120
121
10
    _indices_dirs.erase(index_it);
122
10
    return Status::OK();
123
20
}
124
125
16
Status IndexFileWriter::add_into_searcher_cache() {
126
16
    auto index_file_reader = std::make_unique<IndexFileReader>(
127
16
            _fs, _index_path_prefix, _storage_format, InvertedIndexFileInfo(), _tablet_id);
128
16
    auto st = index_file_reader->init();
129
16
    if (!st.ok()) {
130
2
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr) {
131
            // StreamSinkFileWriter not found file is normal.
132
0
            return Status::OK();
133
0
        }
134
2
        if (dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
135
            // PackedFileWriter: file may be merged, skip cache for now.
136
            // The cache will be populated on first read.
137
0
            return Status::OK();
138
0
        }
139
2
        LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for " << _index_path_prefix
140
2
                     << ", error " << st.msg();
141
2
        return st;
142
2
    }
143
14
    for (const auto& entry : _indices_dirs) {
144
14
        auto index_meta = entry.first;
145
14
        auto dir = DORIS_TRY(index_file_reader->_open(index_meta.first, index_meta.second));
146
14
        std::vector<std::string> file_names;
147
14
        dir->list(&file_names);
148
        // Skip ANN indexes – they use FAISS files (ann.faiss, ann.ivfdata) instead of
149
        // CLucene segments, so building an inverted-index searcher would fail.
150
        // HNSW/IVF produces 1 file (ann.faiss); IVF_ON_DISK produces 2 (ann.faiss + ann.ivfdata).
151
14
        bool is_ann_index =
152
14
                std::any_of(file_names.begin(), file_names.end(), [](const std::string& f) {
153
14
                    return f == faiss_index_fila_name || f == faiss_ivfdata_file_name;
154
14
                });
155
14
        if (is_ann_index) {
156
0
            continue;
157
0
        }
158
14
        auto index_file_key = InvertedIndexDescriptor::get_index_file_cache_key(
159
14
                _index_path_prefix, index_meta.first, index_meta.second);
160
14
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
161
14
        InvertedIndexCacheHandle inverted_index_cache_handle;
162
14
        if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
163
14
                                                           &inverted_index_cache_handle)) {
164
2
            st = InvertedIndexSearcherCache::instance()->erase(searcher_cache_key.index_file_path);
165
2
            if (!st.ok()) {
166
0
                LOG(WARNING) << "IndexFileWriter::add_into_searcher_cache for "
167
0
                             << _index_path_prefix << ", error " << st.msg();
168
0
            }
169
2
        }
170
14
        IndexSearcherPtr searcher;
171
14
        size_t reader_size = 0;
172
14
        auto index_searcher_builder = DORIS_TRY(_construct_index_searcher_builder(dir.get()));
173
14
        RETURN_IF_ERROR(InvertedIndexReader::create_index_searcher(
174
14
                index_searcher_builder.get(), dir.get(), &searcher, reader_size));
175
14
        auto* cache_value = new InvertedIndexSearcherCache::CacheValue(std::move(searcher),
176
14
                                                                       reader_size, UnixMillis());
177
14
        InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value);
178
14
    }
179
14
    return Status::OK();
180
14
}
181
182
Result<std::unique_ptr<IndexSearcherBuilder>> IndexFileWriter::_construct_index_searcher_builder(
183
0
        const DorisCompoundReader* dir) {
184
0
    std::vector<std::string> files;
185
0
    dir->list(&files);
186
0
    auto reader_type = InvertedIndexReaderType::FULLTEXT;
187
0
    bool found_bkd = std::any_of(files.begin(), files.end(), [](const std::string& file) {
188
0
        return file == InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name();
189
0
    });
190
0
    if (found_bkd) {
191
0
        reader_type = InvertedIndexReaderType::BKD;
192
0
    }
193
0
    return IndexSearcherBuilder::create_index_searcher_builder(reader_type);
194
0
}
195
196
952
Status IndexFileWriter::begin_close() {
197
952
    DCHECK(!_closed) << debug_string();
198
952
    _closed = true;
199
952
    if (_indices_dirs.empty()) {
200
        // An empty file must still be created even if there are no indexes to write
201
28
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
202
28
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
203
28
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
204
4
            return _idx_v2_writer->close(true);
205
4
        }
206
24
        return Status::OK();
207
28
    }
208
924
    DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", {
209
924
        if (_storage_format != InvertedIndexStorageFormatPB::V2) {
210
924
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
211
924
                    "IndexFileWriter::close fault injection:inverted index storage format "
212
924
                    "must be v2");
213
924
        }
214
924
    })
215
924
    try {
216
924
        RETURN_IF_ERROR(_index_storage_format->write());
217
5.66k
        for (const auto& entry : _indices_dirs) {
218
5.66k
            const auto& dir = entry.second;
219
            // delete index path, which contains separated inverted index files
220
5.66k
            if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) {
221
180
                auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get());
222
180
                compound_dir->deleteDirectory();
223
180
            }
224
5.66k
        }
225
910
    } catch (CLuceneError& err) {
226
0
        if (_storage_format == InvertedIndexStorageFormatPB::V1) {
227
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
228
0
                    "CLuceneError occur when close, error msg: {}", err.what());
229
0
        } else {
230
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
231
0
                    "CLuceneError occur when close idx file {}, error msg: {}",
232
0
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
233
0
                    err.what());
234
0
        }
235
0
    }
236
910
    return Status::OK();
237
924
}
238
239
922
Status IndexFileWriter::finish_close() {
240
922
    DCHECK(_closed) << debug_string();
241
922
    if (_indices_dirs.empty()) {
242
        // An empty file must still be created even if there are no indexes to write
243
26
        if (dynamic_cast<io::StreamSinkFileWriter*>(_idx_v2_writer.get()) != nullptr ||
244
26
            dynamic_cast<io::S3FileWriter*>(_idx_v2_writer.get()) != nullptr ||
245
26
            dynamic_cast<io::PackedFileWriter*>(_idx_v2_writer.get()) != nullptr) {
246
4
            return _idx_v2_writer->close(false);
247
4
        }
248
22
        return Status::OK();
249
26
    }
250
896
    if (_idx_v2_writer != nullptr && _idx_v2_writer->state() != io::FileWriter::State::CLOSED) {
251
852
        RETURN_IF_ERROR(_idx_v2_writer->close(false));
252
852
    }
253
254
896
    Status st = Status::OK();
255
896
    if (config::enable_write_index_searcher_cache) {
256
14
        st = add_into_searcher_cache();
257
14
    }
258
896
    _indices_dirs.clear();
259
896
    return st;
260
896
}
261
262
6
std::vector<std::string> IndexFileWriter::get_index_file_names() const {
263
6
    std::vector<std::string> file_names;
264
6
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
265
4
        if (_closed && _file_info.index_info_size() > 0) {
266
2
            for (const auto& index_info : _file_info.index_info()) {
267
2
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
268
2
                        _rowset_id, _seg_id, index_info.index_id(), index_info.index_suffix()));
269
2
            }
270
2
        } else {
271
4
            for (const auto& [index_info, _] : _indices_dirs) {
272
4
                file_names.emplace_back(InvertedIndexDescriptor::get_index_file_name_v1(
273
4
                        _rowset_id, _seg_id, index_info.first, index_info.second));
274
4
            }
275
2
        }
276
4
    } else {
277
2
        file_names.emplace_back(
278
2
                InvertedIndexDescriptor::get_index_file_name_v2(_rowset_id, _seg_id));
279
2
    }
280
6
    return file_names;
281
6
}
282
283
0
std::string IndexFileWriter::debug_string() const {
284
0
    std::stringstream indices_dirs;
285
0
    for (const auto& [index, dir] : _indices_dirs) {
286
0
        indices_dirs << "index id is: " << index.first << " , index suffix is: " << index.second
287
0
                     << " , index dir is: " << dir->toString();
288
0
    }
289
0
    return fmt::format(
290
0
            "inverted index file writer debug string: index storage format is: {}, index path "
291
0
            "prefix is: {}, rowset id is: {}, seg id is: {}, closed is: {}, total file size "
292
0
            "is: {}, index dirs is: {}",
293
0
            _storage_format, _index_path_prefix, _rowset_id, _seg_id, _closed, _total_file_size,
294
0
            indices_dirs.str());
295
0
}
296
297
} // namespace doris::segment_v2