Coverage Report

Created: 2026-03-29 03:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/index_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/index_file_reader.h"
19
20
#include <memory>
21
#include <utility>
22
23
#include "storage/index/inverted/inverted_index_compound_reader.h"
24
#include "storage/index/inverted/inverted_index_fs_directory.h"
25
#include "storage/tablet/tablet_schema.h"
26
#include "util/debug_points.h"
27
28
namespace doris::segment_v2 {
29
30
2.41k
Status IndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) {
31
2.41k
    std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
32
2.41k
    if (!_inited) {
33
2.31k
        _read_buffer_size = read_buffer_size;
34
2.31k
        if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
35
2.30k
            RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx));
36
2.30k
        }
37
2.29k
        _inited = true;
38
2.29k
    }
39
2.40k
    return Status::OK();
40
2.41k
}
41
42
2.30k
Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) {
43
2.30k
    auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
44
45
2.30k
    try {
46
2.30k
        CLuceneError err;
47
2.30k
        CL_NS(store)::IndexInput* index_input = nullptr;
48
49
        // 1. get file size from meta
50
2.30k
        int64_t file_size = -1;
51
2.30k
        if (_idx_file_info.has_index_size()) {
52
1.59k
            file_size = _idx_file_info.index_size();
53
1.59k
        }
54
2.30k
        file_size = file_size == 0 ? -1 : file_size;
55
56
2.30k
        DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
57
2.30k
            if (file_size == -1) {
58
2.30k
                return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
59
2.30k
                        "CLuceneError occur file size = -1, file is {}", index_file_full_path);
60
2.30k
            }
61
2.30k
        })
62
63
2.30k
        DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: "
64
0
                               << index_file_full_path;
65
        // 2. open file
66
2.30k
        auto ok =
67
2.30k
                DorisFSDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), index_input,
68
2.30k
                                                     err, read_buffer_size, file_size, _tablet_id);
69
2.30k
        if (!ok) {
70
15
            if (err.number() == CL_ERR_FileNotFound) {
71
14
                return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
72
14
                        "inverted index file {} is not found.", index_file_full_path);
73
14
            } else if (err.number() == CL_ERR_EmptyIndexSegment) {
74
1
                return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
75
1
                        "inverted index file {} is empty.", index_file_full_path);
76
1
            }
77
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
78
0
                    "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
79
0
                    err.what());
80
15
        }
81
2.28k
        _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
82
2.28k
        _stream->setIoContext(io_ctx);
83
2.28k
        _stream->setIndexFile(true);
84
85
        // 3. read file
86
2.28k
        int32_t version = _stream->readInt(); // Read version number
87
2.28k
        if (version >= InvertedIndexStorageFormatPB::V2) {
88
2.28k
            DCHECK(version == _storage_format);
89
2.28k
            int32_t numIndices = _stream->readInt(); // Read number of indices
90
91
69.0k
            for (int32_t i = 0; i < numIndices; ++i) {
92
66.7k
                int64_t indexId = _stream->readLong();      // Read index ID
93
66.7k
                int32_t suffix_length = _stream->readInt(); // Read suffix length
94
66.7k
                std::vector<uint8_t> suffix_data(suffix_length);
95
66.7k
                _stream->readBytes(suffix_data.data(), suffix_length);
96
66.7k
                std::string suffix_str(suffix_data.begin(), suffix_data.end());
97
98
66.7k
                int32_t numFiles = _stream->readInt(); // Read number of files in the index
99
100
66.7k
                auto fileEntries = std::make_unique<EntriesType>();
101
66.7k
                fileEntries->reserve(numFiles);
102
103
690k
                for (int32_t j = 0; j < numFiles; ++j) {
104
623k
                    int32_t file_name_length = _stream->readInt();
105
623k
                    std::string file_name(file_name_length, '\0');
106
623k
                    _stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()),
107
623k
                                       file_name_length);
108
623k
                    auto entry = std::make_unique<ReaderFileEntry>();
109
623k
                    entry->file_name = std::move(file_name);
110
623k
                    entry->offset = _stream->readLong();
111
623k
                    entry->length = _stream->readLong();
112
623k
                    fileEntries->emplace(entry->file_name, std::move(entry));
113
623k
                }
114
115
66.7k
                _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)),
116
66.7k
                                         std::move(fileEntries));
117
66.7k
            }
118
2.28k
        } else {
119
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
120
0
                    "unknown inverted index format {}", version);
121
0
        }
122
2.28k
    } catch (CLuceneError& err) {
123
1
        if (_stream != nullptr) {
124
1
            try {
125
1
                _stream->close();
126
1
            } catch (CLuceneError& err) {
127
0
                return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
128
0
                        "CLuceneError occur when close idx file {}, error msg: {}",
129
0
                        index_file_full_path, err.what());
130
0
            }
131
1
        }
132
1
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
133
1
                "CLuceneError occur when init idx file {}, error msg: {}", index_file_full_path,
134
1
                err.what());
135
1
    }
136
2.28k
    return Status::OK();
137
2.30k
}
138
139
182
Result<InvertedIndexDirectoryMap> IndexFileReader::get_all_directories() {
140
182
    InvertedIndexDirectoryMap res;
141
182
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
142
1.64k
    for (auto& [index, _] : _indices_entries) {
143
1.64k
        auto&& [index_id, index_suffix] = index;
144
1.64k
        LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix;
145
1.64k
        auto ret = _open(index_id, index_suffix);
146
1.64k
        if (!ret.has_value()) {
147
0
            return ResultError(ret.error());
148
0
        }
149
1.64k
        res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value()));
150
1.64k
    }
151
182
    return res;
152
182
}
153
154
Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::_open(
155
4.76k
        int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const {
156
4.76k
    std::unique_ptr<DorisCompoundReader, DirectoryDeleter> compound_reader;
157
158
4.76k
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
159
15
        auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
160
15
                _index_path_prefix, index_id, index_suffix);
161
15
        try {
162
15
            CLuceneError err;
163
15
            CL_NS(store)::IndexInput* index_input = nullptr;
164
165
            // 1. get file size from meta
166
15
            int64_t file_size = -1;
167
15
            if (_idx_file_info.index_info_size() > 0) {
168
1
                for (const auto& idx_info : _idx_file_info.index_info()) {
169
1
                    if (index_id == idx_info.index_id() &&
170
1
                        index_suffix == idx_info.index_suffix()) {
171
1
                        file_size = idx_info.index_file_size();
172
1
                        break;
173
1
                    }
174
1
                }
175
1
            }
176
15
            file_size = file_size == 0 ? -1 : file_size;
177
15
            DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
178
15
                if (file_size == -1) {
179
15
                    return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
180
15
                            "CLuceneError occur file size = -1, file is {}", index_file_path));
181
15
                }
182
15
            })
183
15
            DCHECK(_fs != nullptr)
184
0
                    << "file system is nullptr, index_file_path: " << index_file_path;
185
            // 2. open file
186
15
            auto ok = DorisFSDirectory::FSIndexInput::open(_fs, index_file_path.c_str(),
187
15
                                                           index_input, err, _read_buffer_size,
188
15
                                                           file_size, _tablet_id);
189
15
            if (!ok) {
190
                // now index_input = nullptr
191
1
                if (err.number() == CL_ERR_FileNotFound) {
192
1
                    return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
193
1
                            "inverted index file {} is not found.", index_file_path));
194
1
                }
195
0
                return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
196
0
                        "CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
197
0
                        err.what()));
198
1
            }
199
200
            // 3. read file in DorisCompoundReader
201
14
            compound_reader.reset(new DorisCompoundReader(index_input, _read_buffer_size));
202
14
        } catch (CLuceneError& err) {
203
0
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
204
0
                    "CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
205
0
                    err.what()));
206
0
        }
207
4.74k
    } else {
208
4.74k
        std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
209
4.74k
        if (_stream == nullptr) {
210
1
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
211
1
                    "CLuceneError occur when open idx file {}, stream is nullptr",
212
1
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)));
213
1
        }
214
215
        // Check if the specified index exists
216
4.74k
        auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix));
217
4.74k
        if (index_it == _indices_entries.end()) {
218
4
            std::ostringstream errMsg;
219
4
            errMsg << "No index with id " << index_id << " found";
220
4
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
221
4
                    "CLuceneError occur when open idx file {}, error msg: {}",
222
4
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
223
4
                    errMsg.str()));
224
4
        }
225
        // Need to clone resource here, because index searcher cache need it.
226
4.74k
        compound_reader.reset(new DorisCompoundReader(_stream->clone(), *index_it->second,
227
4.74k
                                                      _read_buffer_size, io_ctx));
228
4.74k
    }
229
4.75k
    return compound_reader;
230
4.76k
}
231
232
Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::open(
233
2.50k
        const TabletIndex* index_meta, const io::IOContext* io_ctx) const {
234
2.50k
    auto index_id = index_meta->index_id();
235
2.50k
    auto index_suffix = index_meta->get_index_suffix();
236
2.50k
    return _open(index_id, index_suffix, io_ctx);
237
2.50k
}
238
239
331
std::string IndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const {
240
331
    return InvertedIndexDescriptor::get_index_file_cache_key(
241
331
            _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
242
331
}
243
244
1.30k
std::string IndexFileReader::get_index_file_path(const TabletIndex* index_meta) const {
245
1.30k
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
246
0
        return InvertedIndexDescriptor::get_index_file_path_v1(
247
0
                _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
248
0
    }
249
1.30k
    return InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
250
1.30k
}
251
252
1
Status IndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const {
253
1
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
254
0
        auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
255
0
                _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
256
0
        return _fs->exists(index_file_path, res);
257
1
    } else {
258
1
        std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
259
1
        if (_stream == nullptr) {
260
1
            *res = false;
261
1
            return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
262
1
                    "idx file {} is not opened",
263
1
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
264
1
        }
265
        // Check if the specified index exists
266
0
        auto index_it = _indices_entries.find(
267
0
                std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
268
0
        if (index_it == _indices_entries.end()) {
269
0
            *res = false;
270
0
        } else {
271
0
            *res = true;
272
0
        }
273
0
    }
274
0
    return Status::OK();
275
1
}
276
277
5
Status IndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const {
278
5
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
279
1
        *res = true;
280
1
        return Status::OK();
281
1
    }
282
4
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
283
4
    if (_stream == nullptr) {
284
1
        return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
285
1
                "idx file {} is not opened",
286
1
                InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
287
1
    }
288
    // Check if the specified index exists
289
3
    auto index_it = _indices_entries.find(
290
3
            std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
291
3
    if (index_it == _indices_entries.end()) {
292
1
        *res = false;
293
2
    } else {
294
2
        const auto& entries = index_it->second;
295
2
        auto entry_it =
296
2
                entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
297
2
        if (entry_it == entries->end()) {
298
0
            *res = false;
299
0
            return Status::OK();
300
0
        }
301
2
        const auto& e = entry_it->second;
302
        // roaring bitmap cookie header size is 5
303
2
        if (e->length <= 5) {
304
1
            *res = false;
305
1
        } else {
306
1
            *res = true;
307
1
        }
308
2
    }
309
3
    return Status::OK();
310
3
}
311
312
1
void IndexFileReader::debug_file_entries() {
313
1
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
314
1
    for (const auto& index : _indices_entries) {
315
1
        LOG(INFO) << "index_id:" << index.first.first;
316
1
        const auto& index_entries = index.second;
317
2
        for (const auto& entry : *index_entries) {
318
2
            const auto& file_entry = entry.second;
319
            LOG(INFO) << "file entry name:" << file_entry->file_name
320
2
                      << " length:" << file_entry->length << " offset:" << file_entry->offset;
321
2
        }
322
1
    }
323
1
}
324
325
} // namespace doris::segment_v2