Coverage Report

Created: 2026-04-20 15:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/task/index_builder.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/task/index_builder.h"
19
20
#include <mutex>
21
22
#include "common/logging.h"
23
#include "common/status.h"
24
#include "storage/field.h"
25
#include "storage/index/index_file_reader.h"
26
#include "storage/index/index_file_writer.h"
27
#include "storage/index/inverted/inverted_index_desc.h"
28
#include "storage/index/inverted/inverted_index_fs_directory.h"
29
#include "storage/olap_define.h"
30
#include "storage/rowset/beta_rowset.h"
31
#include "storage/rowset/rowset_writer_context.h"
32
#include "storage/segment/segment_loader.h"
33
#include "storage/storage_engine.h"
34
#include "storage/tablet/tablet_schema.h"
35
#include "util/debug_points.h"
36
#include "util/trace.h"
37
38
namespace doris {
39
40
IndexBuilder::IndexBuilder(StorageEngine& engine, TabletSharedPtr tablet,
41
                           const std::vector<TColumn>& columns,
42
                           const std::vector<doris::TOlapTableIndex>& alter_inverted_indexes,
43
                           bool is_drop_op)
44
22
        : _engine(engine),
45
22
          _tablet(std::move(tablet)),
46
22
          _columns(columns),
47
22
          _alter_inverted_indexes(alter_inverted_indexes),
48
22
          _is_drop_op(is_drop_op) {
49
22
    _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
50
22
}
51
52
22
IndexBuilder::~IndexBuilder() {
53
22
    _olap_data_convertor.reset();
54
22
    _index_column_writers.clear();
55
22
}
56
57
22
Status IndexBuilder::init() {
58
24
    for (auto inverted_index : _alter_inverted_indexes) {
59
24
        _alter_index_ids.insert(inverted_index.index_id);
60
24
    }
61
22
    return Status::OK();
62
22
}
63
64
18
Status IndexBuilder::update_inverted_index_info() {
65
    // just do link files
66
18
    LOG(INFO) << "begin to update_inverted_index_info, tablet=" << _tablet->tablet_id()
67
18
              << ", is_drop_op=" << _is_drop_op;
68
    // index ids that will not be linked
69
18
    std::set<int64_t> without_index_uids;
70
18
    _output_rowsets.reserve(_input_rowsets.size());
71
18
    _pending_rs_guards.reserve(_input_rowsets.size());
72
18
    for (auto&& input_rowset : _input_rowsets) {
73
18
        bool is_local_rowset = input_rowset->is_local();
74
18
        DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset",
75
18
                        { is_local_rowset = false; })
76
18
        if (!is_local_rowset) [[unlikely]] {
77
            // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id();
78
0
            return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}",
79
0
                                         _tablet->tablet_id(),
80
0
                                         input_rowset->rowset_id().to_string());
81
0
        }
82
83
18
        TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>();
84
18
        const auto& input_rs_tablet_schema = input_rowset->tablet_schema();
85
18
        output_rs_tablet_schema->copy_from(*input_rs_tablet_schema);
86
18
        int64_t total_index_size = 0;
87
18
        auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get());
88
18
        auto size_st = beta_rowset->get_inverted_index_size(&total_index_size);
89
18
        DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", {
90
18
            size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed");
91
18
        })
92
18
        if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() &&
93
18
            !size_st.is<ErrorCode::NOT_FOUND>()) {
94
0
            return size_st;
95
0
        }
96
18
        auto num_segments = input_rowset->num_segments();
97
18
        size_t drop_index_size = 0;
98
99
18
        if (_is_drop_op) {
100
4
            for (const auto& t_inverted_index : _alter_inverted_indexes) {
101
4
                DCHECK_EQ(t_inverted_index.columns.size(), 1);
102
4
                auto column_name = t_inverted_index.columns[0];
103
4
                auto column_idx = output_rs_tablet_schema->field_index(column_name);
104
4
                if (column_idx < 0) {
105
0
                    if (!t_inverted_index.column_unique_ids.empty()) {
106
0
                        auto column_unique_id = t_inverted_index.column_unique_ids[0];
107
0
                        column_idx = output_rs_tablet_schema->field_index(column_unique_id);
108
0
                    }
109
0
                    if (column_idx < 0) {
110
0
                        LOG(WARNING) << "referenced column was missing. "
111
0
                                     << "[column=" << column_name
112
0
                                     << " referenced_column=" << column_idx << "]";
113
0
                        continue;
114
0
                    }
115
0
                }
116
4
                auto column = output_rs_tablet_schema->column(column_idx);
117
118
                // inverted index
119
4
                auto index_metas = output_rs_tablet_schema->inverted_indexs(column);
120
4
                for (const auto& index_meta : index_metas) {
121
                    // Only drop the index that matches the requested index_id,
122
                    // not all indexes on this column
123
4
                    if (index_meta->index_id() != t_inverted_index.index_id) {
124
1
                        continue;
125
1
                    }
126
3
                    if (output_rs_tablet_schema->get_inverted_index_storage_format() ==
127
3
                        InvertedIndexStorageFormatPB::V1) {
128
1
                        const auto& fs = io::global_local_filesystem();
129
130
2
                        for (int seg_id = 0; seg_id < num_segments; seg_id++) {
131
1
                            auto seg_path = local_segment_path(
132
1
                                    _tablet->tablet_path(), input_rowset->rowset_id().to_string(),
133
1
                                    seg_id);
134
1
                            auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
135
1
                                    InvertedIndexDescriptor::get_index_file_path_prefix(seg_path),
136
1
                                    index_meta->index_id(), index_meta->get_index_suffix());
137
1
                            int64_t index_size = 0;
138
1
                            RETURN_IF_ERROR(fs->file_size(index_path, &index_size));
139
1
                            VLOG_DEBUG << "inverted index file:" << index_path
140
0
                                       << " size:" << index_size;
141
1
                            drop_index_size += index_size;
142
1
                        }
143
1
                    }
144
3
                    _dropped_inverted_indexes.push_back(*index_meta);
145
                    // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
146
                    // remove dropped index_meta from output rowset tablet schema
147
3
                    output_rs_tablet_schema->remove_index(index_meta->index_id());
148
3
                }
149
150
                // ann index
151
4
                const auto* ann_index = output_rs_tablet_schema->ann_index(column);
152
4
                if (!ann_index) {
153
3
                    continue;
154
3
                }
155
                // Only drop the ann index that matches the requested index_id
156
1
                if (ann_index->index_id() != t_inverted_index.index_id) {
157
0
                    continue;
158
0
                }
159
1
                DCHECK(output_rs_tablet_schema->get_inverted_index_storage_format() !=
160
1
                       InvertedIndexStorageFormatPB::V1);
161
1
                _dropped_inverted_indexes.push_back(*ann_index);
162
                // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
163
                // remove dropped index_meta from output rowset tablet schema
164
1
                output_rs_tablet_schema->remove_index(ann_index->index_id());
165
1
            }
166
167
4
            DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", {
168
4
                auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
169
4
                        "index_builder.update_inverted_index_info.drop_index", "indexes_count", 0);
170
4
                if (indexes_count < 0) {
171
4
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
172
4
                            "indexes count cannot be negative");
173
4
                }
174
4
                auto indexes_size = output_rs_tablet_schema->inverted_indexes().size();
175
4
                if (indexes_count != indexes_size) {
176
4
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
177
4
                            "indexes count not equal to expected");
178
4
                }
179
4
            })
180
14
        } else {
181
            // base on input rowset's tablet_schema to build
182
            // output rowset's tablet_schema which only add
183
            // the indexes specified in this build index request
184
16
            for (auto t_inverted_index : _alter_inverted_indexes) {
185
16
                TabletIndex index;
186
16
                index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema);
187
16
                auto column_uid = index.col_unique_ids()[0];
188
16
                if (column_uid < 0) {
189
3
                    LOG(WARNING) << "referenced column was missing. "
190
3
                                 << "[column=" << t_inverted_index.columns[0]
191
3
                                 << " referenced_column=" << column_uid << "]";
192
3
                    continue;
193
3
                }
194
13
                const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid);
195
196
                // inverted index
197
13
                auto exist_indexs = output_rs_tablet_schema->inverted_indexs(col);
198
13
                for (const auto& exist_index : exist_indexs) {
199
0
                    if (exist_index->index_id() != index.index_id()) {
200
0
                        if (exist_index->is_same_except_id(&index)) {
201
0
                            LOG(WARNING) << fmt::format(
202
0
                                    "column: {} has a exist inverted index, but the index id not "
203
0
                                    "equal "
204
0
                                    "request's index id, , exist index id: {}, request's index id: "
205
0
                                    "{}, "
206
0
                                    "remove exist index in new output_rs_tablet_schema",
207
0
                                    column_uid, exist_index->index_id(), index.index_id());
208
0
                            without_index_uids.insert(exist_index->index_id());
209
0
                            output_rs_tablet_schema->remove_index(exist_index->index_id());
210
0
                        }
211
0
                    }
212
0
                }
213
214
                // ann index
215
13
                const auto* exist_index = output_rs_tablet_schema->ann_index(col);
216
13
                if (exist_index && exist_index->index_id() != index.index_id()) {
217
0
                    if (exist_index->is_same_except_id(&index)) {
218
0
                        LOG(WARNING) << fmt::format(
219
0
                                "column: {} has a exist ann index, but the index id not "
220
0
                                "equal request's index id, , exist index id: {}, request's index "
221
0
                                "id: {}, remove exist index in new output_rs_tablet_schema",
222
0
                                column_uid, exist_index->index_id(), index.index_id());
223
0
                        without_index_uids.insert(exist_index->index_id());
224
0
                        output_rs_tablet_schema->remove_index(exist_index->index_id());
225
0
                    }
226
0
                }
227
228
13
                output_rs_tablet_schema->append_index(std::move(index));
229
13
            }
230
14
        }
231
        // construct input rowset reader
232
18
        RowsetReaderSharedPtr input_rs_reader;
233
18
        RETURN_IF_ERROR(input_rowset->create_reader(&input_rs_reader));
234
        // construct output rowset writer
235
18
        RowsetWriterContext context;
236
18
        context.version = input_rs_reader->version();
237
18
        context.rowset_state = VISIBLE;
238
18
        context.segments_overlap = input_rowset->rowset_meta()->segments_overlap();
239
18
        context.tablet_schema = output_rs_tablet_schema;
240
18
        context.newest_write_timestamp = input_rs_reader->newest_write_timestamp();
241
18
        auto output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
242
18
        _pending_rs_guards.push_back(_engine.add_pending_rowset(context));
243
244
        // if without_index_uids is not empty, copy _alter_index_ids to it
245
        // else just use _alter_index_ids to avoid copy
246
18
        if (!without_index_uids.empty()) {
247
0
            without_index_uids.insert(_alter_index_ids.begin(), _alter_index_ids.end());
248
0
        }
249
250
        // build output rowset
251
18
        RETURN_IF_ERROR(input_rowset->link_files_to(
252
18
                _tablet->tablet_path(), output_rs_writer->rowset_id(), 0,
253
18
                without_index_uids.empty() ? &_alter_index_ids : &without_index_uids));
254
255
18
        auto input_rowset_meta = input_rowset->rowset_meta();
256
18
        RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
257
18
        rowset_meta->set_num_rows(input_rowset_meta->num_rows());
258
18
        if (output_rs_tablet_schema->get_inverted_index_storage_format() ==
259
18
            InvertedIndexStorageFormatPB::V1) {
260
4
            if (_is_drop_op) {
261
1
                VLOG_DEBUG << "data_disk_size:" << input_rowset_meta->data_disk_size()
262
0
                           << " total_disk_size:" << input_rowset_meta->total_disk_size()
263
0
                           << " index_disk_size:" << input_rowset_meta->index_disk_size()
264
0
                           << " drop_index_size:" << drop_index_size;
265
1
                rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() -
266
1
                                                 drop_index_size);
267
1
                rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
268
1
                rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() -
269
1
                                                 drop_index_size);
270
3
            } else {
271
3
                rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size());
272
3
                rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
273
3
                rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size());
274
3
            }
275
14
        } else {
276
31
            for (int seg_id = 0; seg_id < num_segments; seg_id++) {
277
17
                auto seg_path = DORIS_TRY(input_rowset->segment_path(seg_id));
278
17
                auto idx_file_reader = std::make_unique<IndexFileReader>(
279
17
                        context.fs(),
280
17
                        std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
281
17
                        output_rs_tablet_schema->get_inverted_index_storage_format(),
282
17
                        InvertedIndexFileInfo(), _tablet->tablet_id());
283
17
                auto st = idx_file_reader->init();
284
17
                DBUG_EXECUTE_IF(
285
17
                        "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", {
286
17
                            st = Status::Error<ErrorCode::INIT_FAILED>(
287
17
                                    "debug point: reader init error");
288
17
                        })
289
17
                if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
290
0
                    return st;
291
0
                }
292
17
                _index_file_readers.emplace(
293
17
                        std::make_pair(output_rs_writer->rowset_id().to_string(), seg_id),
294
17
                        std::move(idx_file_reader));
295
17
            }
296
14
            rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() -
297
14
                                             total_index_size);
298
14
            rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
299
14
            rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() -
300
14
                                             total_index_size);
301
14
        }
302
18
        rowset_meta->set_empty(input_rowset_meta->empty());
303
18
        rowset_meta->set_num_segments(input_rowset_meta->num_segments());
304
18
        rowset_meta->set_segments_overlap(input_rowset_meta->segments_overlap());
305
18
        rowset_meta->set_rowset_state(input_rowset_meta->rowset_state());
306
18
        std::vector<KeyBoundsPB> key_bounds;
307
18
        RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
308
18
        rowset_meta->set_segments_key_bounds_truncated(
309
18
                input_rowset_meta->is_segments_key_bounds_truncated());
310
18
        rowset_meta->set_segments_key_bounds_aggregated(
311
18
                input_rowset_meta->is_segments_key_bounds_aggregated());
312
        // key_bounds is already aggregated (size 1) or per-segment; copy verbatim.
313
18
        rowset_meta->set_segments_key_bounds(key_bounds);
314
18
        std::vector<uint32_t> num_segment_rows;
315
18
        input_rowset_meta->get_num_segment_rows(&num_segment_rows);
316
18
        rowset_meta->set_num_segment_rows(num_segment_rows);
317
18
        auto output_rowset = output_rs_writer->manual_build(rowset_meta);
318
18
        if (input_rowset_meta->has_delete_predicate()) {
319
0
            output_rowset->rowset_meta()->set_delete_predicate(
320
0
                    input_rowset_meta->delete_predicate());
321
0
        }
322
18
        _output_rowsets.push_back(output_rowset);
323
18
    }
324
325
18
    return Status::OK();
326
18
}
327
328
Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta,
329
16
                                          std::vector<segment_v2::SegmentSharedPtr>& segments) {
330
16
    bool is_local_rowset = output_rowset_meta->is_local();
331
16
    DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset",
332
16
                    { is_local_rowset = false; })
333
16
    if (!is_local_rowset) [[unlikely]] {
334
        // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id();
335
0
        return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}",
336
0
                                     _tablet->tablet_id(),
337
0
                                     output_rowset_meta->rowset_id().to_string());
338
0
    }
339
340
16
    if (_is_drop_op) {
341
4
        const auto& output_rs_tablet_schema = output_rowset_meta->tablet_schema();
342
4
        if (output_rs_tablet_schema->get_inverted_index_storage_format() !=
343
4
            InvertedIndexStorageFormatPB::V1) {
344
3
            const auto& fs = output_rowset_meta->fs();
345
346
3
            const auto& output_rowset_schema = output_rowset_meta->tablet_schema();
347
3
            size_t inverted_index_size = 0;
348
3
            for (auto& seg_ptr : segments) {
349
3
                auto idx_file_reader_iter = _index_file_readers.find(
350
3
                        std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
351
3
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op",
352
3
                                { idx_file_reader_iter = _index_file_readers.end(); })
353
3
                if (idx_file_reader_iter == _index_file_readers.end()) {
354
0
                    LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":"
355
0
                               << seg_ptr->id() << " cannot be found";
356
0
                    continue;
357
0
                }
358
3
                auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories());
359
360
3
                std::string index_path_prefix {
361
3
                        InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path(
362
3
                                _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(),
363
3
                                seg_ptr->id()))};
364
365
3
                std::string index_path =
366
3
                        InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
367
3
                io::FileWriterPtr file_writer;
368
3
                Status st = fs->create_file(index_path, &file_writer);
369
3
                if (!st.ok()) {
370
0
                    LOG(WARNING) << "failed to create writable file. path=" << index_path
371
0
                                 << ", err: " << st;
372
0
                    return st;
373
0
                }
374
3
                auto index_file_writer = std::make_unique<IndexFileWriter>(
375
3
                        fs, std::move(index_path_prefix),
376
3
                        output_rowset_meta->rowset_id().to_string(), seg_ptr->id(),
377
3
                        output_rowset_schema->get_inverted_index_storage_format(),
378
3
                        std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id());
379
3
                RETURN_IF_ERROR(index_file_writer->initialize(dirs));
380
                // create inverted index writer
381
3
                for (auto& index_meta : _dropped_inverted_indexes) {
382
3
                    RETURN_IF_ERROR(index_file_writer->delete_index(&index_meta));
383
3
                }
384
3
                _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
385
3
            }
386
3
            for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
387
3
                auto st = index_file_writer->begin_close();
388
3
                if (!st.ok()) {
389
0
                    LOG(ERROR) << "close index_file_writer error:" << st;
390
0
                    return st;
391
0
                }
392
3
                inverted_index_size += index_file_writer->get_index_file_total_size();
393
3
            }
394
3
            for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
395
3
                auto st = index_file_writer->finish_close();
396
3
                if (!st.ok()) {
397
0
                    LOG(ERROR) << "wait close index_file_writer error:" << st;
398
0
                    return st;
399
0
                }
400
3
            }
401
3
            _index_file_writers.clear();
402
3
            output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
403
3
            output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
404
3
                                                    inverted_index_size);
405
3
            output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() +
406
3
                                                    inverted_index_size);
407
3
        }
408
4
        LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows();
409
4
        return Status::OK();
410
12
    } else {
411
        // create inverted or ann index writer
412
12
        const auto& fs = output_rowset_meta->fs();
413
12
        auto output_rowset_schema = output_rowset_meta->tablet_schema();
414
12
        size_t inverted_index_size = 0;
415
15
        for (auto& seg_ptr : segments) {
416
15
            std::string index_path_prefix {
417
15
                    InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path(
418
15
                            _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(),
419
15
                            seg_ptr->id()))};
420
15
            std::vector<ColumnId> return_columns;
421
15
            std::vector<std::pair<int64_t, int64_t>> inverted_index_writer_signs;
422
15
            _olap_data_convertor->reserve(_alter_inverted_indexes.size());
423
424
15
            std::unique_ptr<IndexFileWriter> index_file_writer = nullptr;
425
15
            if (output_rowset_schema->get_inverted_index_storage_format() >=
426
15
                InvertedIndexStorageFormatPB::V2) {
427
12
                auto idx_file_reader_iter = _index_file_readers.find(
428
12
                        std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
429
12
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader",
430
12
                                { idx_file_reader_iter = _index_file_readers.end(); })
431
12
                if (idx_file_reader_iter == _index_file_readers.end()) {
432
0
                    LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":"
433
0
                               << seg_ptr->id() << " cannot be found";
434
0
                    continue;
435
0
                }
436
12
                std::string index_path =
437
12
                        InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
438
12
                io::FileWriterPtr file_writer;
439
12
                Status st = fs->create_file(index_path, &file_writer);
440
12
                if (!st.ok()) {
441
0
                    LOG(WARNING) << "failed to create writable file. path=" << index_path
442
0
                                 << ", err: " << st;
443
0
                    return st;
444
0
                }
445
12
                auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories());
446
12
                index_file_writer = std::make_unique<IndexFileWriter>(
447
12
                        fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(),
448
12
                        seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(),
449
12
                        std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id());
450
12
                RETURN_IF_ERROR(index_file_writer->initialize(dirs));
451
12
            } else {
452
3
                index_file_writer = std::make_unique<IndexFileWriter>(
453
3
                        fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(),
454
3
                        seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(),
455
3
                        nullptr, true /* can_use_ram_dir */, _tablet->tablet_id());
456
3
            }
457
            // create inverted index writer, or ann index writer
458
19
            for (auto inverted_index : _alter_inverted_indexes) {
459
19
                DCHECK(inverted_index.index_type == TIndexType::INVERTED ||
460
19
                       inverted_index.index_type == TIndexType::ANN);
461
19
                DCHECK_EQ(inverted_index.columns.size(), 1);
462
19
                auto index_id = inverted_index.index_id;
463
19
                auto column_name = inverted_index.columns[0];
464
19
                auto column_idx = output_rowset_schema->field_index(column_name);
465
19
                if (column_idx < 0) {
466
4
                    if (inverted_index.__isset.column_unique_ids &&
467
4
                        !inverted_index.column_unique_ids.empty()) {
468
1
                        column_idx = output_rowset_schema->field_index(
469
1
                                inverted_index.column_unique_ids[0]);
470
1
                    }
471
4
                    if (column_idx < 0) {
472
3
                        LOG(WARNING) << "referenced column was missing. "
473
3
                                     << "[column=" << column_name
474
3
                                     << " referenced_column=" << column_idx << "]";
475
3
                        continue;
476
3
                    }
477
4
                }
478
16
                auto column = output_rowset_schema->column(column_idx);
479
                // variant column is not support for building index
480
16
                auto is_support_inverted_index =
481
16
                        IndexColumnWriter::check_support_inverted_index(column);
482
16
                auto is_support_ann_index = IndexColumnWriter::check_support_ann_index(column);
483
16
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index",
484
16
                                { is_support_inverted_index = false; })
485
16
                if (!is_support_inverted_index && !is_support_ann_index) {
486
0
                    continue;
487
0
                }
488
16
                DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id));
489
16
                _olap_data_convertor->add_column_data_convertor(column);
490
16
                return_columns.emplace_back(column_idx);
491
16
                std::unique_ptr<StorageField> field(StorageFieldFactory::create(column));
492
493
16
                if (inverted_index.index_type == TIndexType::INVERTED) {
494
                    // inverted index
495
15
                    auto index_metas = output_rowset_schema->inverted_indexs(column);
496
15
                    for (const auto& index_meta : index_metas) {
497
15
                        if (index_meta->index_id() != index_id) {
498
0
                            continue;
499
0
                        }
500
15
                        std::unique_ptr<segment_v2::IndexColumnWriter> inverted_index_builder;
501
15
                        try {
502
15
                            RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create(
503
15
                                    field.get(), &inverted_index_builder, index_file_writer.get(),
504
15
                                    index_meta));
505
15
                            DBUG_EXECUTE_IF(
506
15
                                    "IndexBuilder::handle_single_rowset_index_column_writer_create_"
507
15
                                    "error",
508
15
                                    {
509
15
                                        _CLTHROWA(CL_ERR_IO,
510
15
                                                  "debug point: "
511
15
                                                  "handle_single_rowset_index_column_writer_create_"
512
15
                                                  "error");
513
15
                                    })
514
15
                        } catch (const std::exception& e) {
515
0
                            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
516
0
                                    "CLuceneError occured: {}", e.what());
517
0
                        }
518
519
15
                        if (inverted_index_builder) {
520
15
                            auto writer_sign = std::make_pair(seg_ptr->id(), index_id);
521
15
                            _index_column_writers.insert(
522
15
                                    std::make_pair(writer_sign, std::move(inverted_index_builder)));
523
15
                            inverted_index_writer_signs.emplace_back(writer_sign);
524
15
                        }
525
15
                    }
526
15
                } else if (inverted_index.index_type == TIndexType::ANN) {
527
                    // ann index
528
1
                    const auto* index_meta = output_rowset_schema->ann_index(column);
529
1
                    if (index_meta && index_meta->index_id() == index_id) {
530
1
                        std::unique_ptr<segment_v2::IndexColumnWriter> index_writer;
531
1
                        try {
532
1
                            RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create(
533
1
                                    field.get(), &index_writer, index_file_writer.get(),
534
1
                                    index_meta));
535
1
                            DBUG_EXECUTE_IF(
536
1
                                    "IndexBuilder::handle_single_rowset_index_column_writer_create_"
537
1
                                    "error",
538
1
                                    {
539
1
                                        _CLTHROWA(CL_ERR_IO,
540
1
                                                  "debug point: "
541
1
                                                  "handle_single_rowset_index_column_writer_create_"
542
1
                                                  "error");
543
1
                                    })
544
1
                        } catch (const std::exception& e) {
545
0
                            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
546
0
                                    "CLuceneError occured: {}", e.what());
547
0
                        }
548
549
1
                        if (index_writer) {
550
1
                            auto writer_sign = std::make_pair(seg_ptr->id(), index_id);
551
1
                            _index_column_writers.insert(
552
1
                                    std::make_pair(writer_sign, std::move(index_writer)));
553
1
                            inverted_index_writer_signs.emplace_back(writer_sign);
554
1
                        }
555
1
                    }
556
1
                }
557
16
            }
558
559
            // DO NOT forget index_file_writer for the segment, otherwise, original inverted index will be deleted.
560
15
            _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
561
15
            if (return_columns.empty()) {
562
                // no columns to read
563
3
                continue;
564
3
            }
565
            // create iterator for each segment
566
12
            StorageReadOptions read_options;
567
12
            OlapReaderStatistics stats;
568
12
            read_options.stats = &stats;
569
12
            read_options.tablet_schema = output_rowset_schema;
570
12
            std::shared_ptr<Schema> schema =
571
12
                    std::make_shared<Schema>(output_rowset_schema->columns(), return_columns);
572
12
            std::unique_ptr<RowwiseIterator> iter;
573
12
            auto res = seg_ptr->new_iterator(schema, read_options, &iter);
574
12
            DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", {
575
12
                res = Status::Error<ErrorCode::INTERNAL_ERROR>(
576
12
                        "debug point: handle_single_rowset_create_iterator_error");
577
12
            })
578
12
            if (!res.ok()) {
579
0
                LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
580
0
                             << "]: " << res.to_string();
581
0
                return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string());
582
0
            }
583
584
12
            auto block = Block::create_unique(output_rowset_schema->create_block(return_columns));
585
24
            while (true) {
586
24
                auto status = iter->next_batch(block.get());
587
24
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", {
588
24
                    status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
589
24
                            "next_batch fault injection");
590
24
                });
591
24
                if (!status.ok()) {
592
12
                    if (status.is<ErrorCode::END_OF_FILE>()) {
593
12
                        break;
594
12
                    }
595
12
                    LOG(WARNING)
596
0
                            << "failed to read next block when schema change for inverted index."
597
0
                            << ", err=" << status.to_string();
598
0
                    return status;
599
12
                }
600
601
                // write inverted index data, or ann index data
602
12
                status = _write_inverted_index_data(output_rowset_schema, iter->data_id(),
603
12
                                                    block.get());
604
12
                DBUG_EXECUTE_IF(
605
12
                        "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", {
606
12
                            status = Status::Error<ErrorCode::INTERNAL_ERROR>(
607
12
                                    "debug point: "
608
12
                                    "handle_single_rowset_write_inverted_index_data_error");
609
12
                        })
610
12
                if (!status.ok()) {
611
0
                    return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
612
0
                            "failed to write block.");
613
0
                }
614
12
                block->clear_column_data();
615
12
            }
616
617
            // finish write inverted index, flush data to compound file
618
16
            for (auto& writer_sign : inverted_index_writer_signs) {
619
16
                try {
620
16
                    if (_index_column_writers[writer_sign]) {
621
16
                        RETURN_IF_ERROR(_index_column_writers[writer_sign]->finish());
622
16
                    }
623
16
                    DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", {
624
16
                        _CLTHROWA(CL_ERR_IO,
625
16
                                  "debug point: handle_single_rowset_index_build_finish_error");
626
16
                    })
627
16
                } catch (const std::exception& e) {
628
0
                    return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
629
0
                            "CLuceneError occured: {}", e.what());
630
0
                }
631
16
            }
632
633
12
            _olap_data_convertor->reset();
634
12
        }
635
15
        for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
636
15
            auto st = index_file_writer->begin_close();
637
15
            DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", {
638
15
                st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
639
15
                        "debug point: handle_single_rowset_file_writer_close_error");
640
15
            })
641
15
            if (!st.ok()) {
642
0
                LOG(ERROR) << "close index_file_writer error:" << st;
643
0
                return st;
644
0
            }
645
15
            inverted_index_size += index_file_writer->get_index_file_total_size();
646
15
        }
647
15
        for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
648
15
            auto st = index_file_writer->finish_close();
649
15
            if (!st.ok()) {
650
0
                LOG(ERROR) << "wait close index_file_writer error:" << st;
651
0
                return st;
652
0
            }
653
15
        }
654
12
        _index_column_writers.clear();
655
12
        _index_file_writers.clear();
656
12
        output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
657
12
        output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
658
12
                                                inverted_index_size);
659
12
        output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() +
660
12
                                                inverted_index_size);
661
12
        LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows();
662
12
    }
663
664
12
    return Status::OK();
665
16
}
666
667
Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, int64_t segment_idx,
668
12
                                                Block* block) {
669
12
    VLOG_DEBUG << "begin to write inverted/ann index";
670
    // converter block data
671
12
    _olap_data_convertor->set_source_content(block, 0, block->rows());
672
28
    for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) {
673
16
        auto inverted_index = _alter_inverted_indexes[i];
674
16
        auto index_id = inverted_index.index_id;
675
16
        auto column_name = inverted_index.columns[0];
676
16
        auto column_idx = tablet_schema->field_index(column_name);
677
16
        DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative",
678
16
                        { column_idx = -1; })
679
16
        if (column_idx < 0) {
680
1
            if (!inverted_index.column_unique_ids.empty()) {
681
1
                auto column_unique_id = inverted_index.column_unique_ids[0];
682
1
                column_idx = tablet_schema->field_index(column_unique_id);
683
1
            }
684
1
            if (column_idx < 0) {
685
0
                LOG(WARNING) << "referenced column was missing. "
686
0
                             << "[column=" << column_name << " referenced_column=" << column_idx
687
0
                             << "]";
688
0
                continue;
689
0
            }
690
1
        }
691
16
        auto column = tablet_schema->column(column_idx);
692
16
        auto writer_sign = std::make_pair(segment_idx, index_id);
693
16
        std::unique_ptr<StorageField> field(StorageFieldFactory::create(column));
694
16
        auto converted_result = _olap_data_convertor->convert_column_data(i);
695
16
        DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", {
696
16
            converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>(
697
16
                    "debug point: _write_inverted_index_data_convert_column_data_error");
698
16
        })
699
16
        if (converted_result.first != Status::OK()) {
700
0
            LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first;
701
0
            return converted_result.first;
702
0
        }
703
16
        const auto* ptr = (const uint8_t*)converted_result.second->get_data();
704
16
        const auto* null_map = converted_result.second->get_nullmap();
705
16
        if (null_map) {
706
0
            RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(), null_map, &ptr,
707
0
                                          block->rows()));
708
16
        } else {
709
16
            RETURN_IF_ERROR(_add_data(column_name, writer_sign, field.get(), &ptr, block->rows()));
710
16
        }
711
16
    }
712
12
    _olap_data_convertor->clear_source_content();
713
714
12
    return Status::OK();
715
12
}
716
717
Status IndexBuilder::_add_nullable(const std::string& column_name,
718
                                   const std::pair<int64_t, int64_t>& index_writer_sign,
719
                                   StorageField* field, const uint8_t* null_map,
720
0
                                   const uint8_t** ptr, size_t num_rows) {
721
    // TODO: need to process null data for inverted index
722
0
    if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
723
0
        DCHECK(field->get_sub_field_count() == 1);
724
        // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
725
0
        const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
726
        // total number length
727
0
        auto offset_data = *(data_ptr + 1);
728
0
        const auto* offsets_ptr = (const uint8_t*)offset_data;
729
0
        try {
730
0
            auto data = *(data_ptr + 2);
731
0
            auto nested_null_map = *(data_ptr + 3);
732
0
            RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values(
733
0
                    field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data),
734
0
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
735
0
            DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", {
736
0
                _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error");
737
0
            })
738
0
            RETURN_IF_ERROR(
739
0
                    _index_column_writers[index_writer_sign]->add_array_nulls(null_map, num_rows));
740
0
        } catch (const std::exception& e) {
741
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
742
0
                    "CLuceneError occured: {}", e.what());
743
0
        }
744
745
0
        return Status::OK();
746
0
    }
747
0
    size_t offset = 0;
748
0
    auto next_run_step = [&]() {
749
0
        size_t step = 1;
750
0
        for (auto i = offset + 1; i < num_rows; ++i) {
751
0
            if (null_map[offset] == null_map[i]) {
752
0
                step++;
753
0
            } else {
754
0
                break;
755
0
            }
756
0
        }
757
0
        return step;
758
0
    };
759
0
    try {
760
0
        do {
761
0
            auto step = next_run_step();
762
0
            if (null_map[offset]) {
763
0
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_nulls(
764
0
                        static_cast<uint32_t>(step)));
765
0
            } else {
766
0
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name,
767
0
                                                                                     *ptr, step));
768
0
            }
769
0
            *ptr += field->size() * step;
770
0
            offset += step;
771
0
            DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception",
772
0
                            { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); })
773
0
        } while (offset < num_rows);
774
0
    } catch (const std::exception& e) {
775
0
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}",
776
0
                                                                      e.what());
777
0
    }
778
779
0
    return Status::OK();
780
0
}
781
782
Status IndexBuilder::_add_data(const std::string& column_name,
783
                               const std::pair<int64_t, int64_t>& index_writer_sign,
784
16
                               StorageField* field, const uint8_t** ptr, size_t num_rows) {
785
16
    try {
786
16
        if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
787
2
            DCHECK(field->get_sub_field_count() == 1);
788
            // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
789
2
            const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
790
            // total number length
791
2
            auto element_cnt = size_t((unsigned long)(*data_ptr));
792
2
            auto offset_data = *(data_ptr + 1);
793
2
            const auto* offsets_ptr = (const uint8_t*)offset_data;
794
2
            if (element_cnt > 0) {
795
2
                auto data = *(data_ptr + 2);
796
2
                auto nested_null_map = *(data_ptr + 3);
797
2
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values(
798
2
                        field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data),
799
2
                        reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
800
2
            }
801
14
        } else {
802
14
            RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, *ptr,
803
14
                                                                                 num_rows));
804
14
        }
805
16
        DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception",
806
16
                        { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); })
807
16
    } catch (const std::exception& e) {
808
0
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}",
809
0
                                                                      e.what());
810
0
    }
811
812
16
    return Status::OK();
813
16
}
814
815
17
Status IndexBuilder::handle_inverted_index_data() {
816
17
    LOG(INFO) << "begin to handle_inverted_index_data";
817
17
    DCHECK(_input_rowsets.size() == _output_rowsets.size());
818
17
    for (auto& _output_rowset : _output_rowsets) {
819
17
        SegmentCacheHandle segment_cache_handle;
820
17
        RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
821
17
                std::static_pointer_cast<BetaRowset>(_output_rowset), &segment_cache_handle));
822
17
        auto output_rowset_meta = _output_rowset->rowset_meta();
823
17
        auto& segments = segment_cache_handle.get_segments();
824
17
        RETURN_IF_ERROR(handle_single_rowset(output_rowset_meta, segments));
825
17
    }
826
16
    return Status::OK();
827
17
}
828
829
21
Status IndexBuilder::do_build_inverted_index() {
830
21
    LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id()
831
21
              << ", is_drop_op=" << _is_drop_op;
832
21
    DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty",
833
21
                    { _alter_inverted_indexes.clear(); })
834
21
    if (_alter_inverted_indexes.empty()) {
835
0
        return Status::OK();
836
0
    }
837
838
21
    static constexpr long TRY_LOCK_TIMEOUT = 30;
839
21
    std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::defer_lock);
840
21
    bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));
841
842
21
    if (!owns_lock) {
843
0
        return Status::ObtainLockFailed(
844
0
                "try schema_change_lock failed. There might be schema change or cooldown running "
845
0
                "on "
846
0
                "tablet={} ",
847
0
                _tablet->tablet_id());
848
0
    }
849
    // Check executing serially with compaction task.
850
21
    std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
851
21
                                                      std::try_to_lock);
852
21
    if (!base_compaction_lock.owns_lock()) {
853
0
        return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ",
854
0
                                        _tablet->tablet_id());
855
0
    }
856
21
    std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
857
21
                                                      std::try_to_lock);
858
21
    if (!cumu_compaction_lock.owns_lock()) {
859
0
        return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}",
860
0
                                        _tablet->tablet_id());
861
0
    }
862
863
21
    std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
864
21
                                                      std::try_to_lock);
865
21
    if (!cold_compaction_lock.owns_lock()) {
866
0
        return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}",
867
0
                                        _tablet->tablet_id());
868
0
    }
869
870
21
    std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
871
21
                                                           std::try_to_lock);
872
21
    if (!build_inverted_index_lock.owns_lock()) {
873
0
        return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}",
874
0
                                        _tablet->tablet_id());
875
0
    }
876
877
21
    std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
878
21
    if (!migration_rlock.owns_lock()) {
879
0
        return Status::ObtainLockFailed("got migration_rlock failed. tablet={}",
880
0
                                        _tablet->tablet_id());
881
0
    }
882
883
21
    DCHECK(!_alter_index_ids.empty());
884
21
    _input_rowsets =
885
21
            _tablet->pick_candidate_rowsets_to_build_inverted_index(_alter_index_ids, _is_drop_op);
886
21
    if (_input_rowsets.empty()) {
887
1
        LOG(INFO) << "_input_rowsets is empty";
888
1
        return Status::OK();
889
1
    }
890
891
20
    auto st = update_inverted_index_info();
892
20
    if (!st.ok()) {
893
3
        LOG(WARNING) << "failed to update_inverted_index_info. "
894
3
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
895
3
        gc_output_rowset();
896
3
        return st;
897
3
    }
898
899
    // create inverted index file for output rowset
900
17
    st = handle_inverted_index_data();
901
17
    if (!st.ok()) {
902
1
        LOG(WARNING) << "failed to handle_inverted_index_data. "
903
1
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
904
1
        gc_output_rowset();
905
1
        return st;
906
1
    }
907
908
    // modify rowsets in memory
909
16
    st = modify_rowsets();
910
16
    DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", {
911
16
        st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>(
912
16
                "debug point: do_build_inverted_index_modify_rowsets_status_error");
913
16
    })
914
16
    if (!st.ok()) {
915
0
        LOG(WARNING) << "failed to modify rowsets in memory. "
916
0
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
917
0
        gc_output_rowset();
918
0
        return st;
919
0
    }
920
16
    return Status::OK();
921
16
}
922
923
16
Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) {
924
16
    DCHECK(std::ranges::all_of(
925
16
            _output_rowsets.begin(), _output_rowsets.end(), [&engine = _engine](auto&& rs) {
926
16
                if (engine.check_rowset_id_in_unused_rowsets(rs->rowset_id())) {
927
16
                    LOG(ERROR) << "output rowset: " << rs->rowset_id() << " in unused rowsets";
928
16
                    return false;
929
16
                }
930
16
                return true;
931
16
            }));
932
933
16
    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
934
16
        _tablet->enable_unique_key_merge_on_write()) {
935
1
        std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock());
936
1
        std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock());
937
1
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
938
1
        DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id());
939
2
        for (auto i = 0; i < _input_rowsets.size(); ++i) {
940
1
            RowsetId input_rowset_id = _input_rowsets[i]->rowset_id();
941
1
            RowsetId output_rowset_id = _output_rowsets[i]->rowset_id();
942
1
            for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) {
943
0
                RowsetId rs_id = std::get<0>(k);
944
0
                if (rs_id == input_rowset_id) {
945
0
                    DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k),
946
0
                                                             std::get<2>(k)};
947
0
                    auto res = delete_bitmap->set(output_rs_key, v);
948
0
                    DCHECK(res > 0) << "delete_bitmap set failed, res=" << res;
949
0
                }
950
0
            }
951
1
        }
952
1
        _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
953
954
        // modify_rowsets will remove the delete_bitmap for input rowsets,
955
        // should call it after merge delete_bitmap
956
1
        RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true));
957
15
    } else {
958
15
        std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
959
15
        RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true));
960
15
    }
961
962
#ifndef BE_TEST
963
    {
964
        std::shared_lock rlock(_tablet->get_header_lock());
965
        _tablet->save_meta();
966
    }
967
#endif
968
16
    return Status::OK();
969
16
}
970
971
4
void IndexBuilder::gc_output_rowset() {
972
4
    for (auto&& output_rowset : _output_rowsets) {
973
2
        auto is_local_rowset = output_rowset->is_local();
974
2
        DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset",
975
2
                        { is_local_rowset = false; })
976
2
        if (!is_local_rowset) {
977
0
            _tablet->record_unused_remote_rowset(output_rowset->rowset_id(),
978
0
                                                 output_rowset->rowset_meta()->resource_id(),
979
0
                                                 output_rowset->num_segments());
980
0
            return;
981
0
        }
982
2
        _engine.add_unused_rowset(output_rowset);
983
2
    }
984
4
}
985
986
} // namespace doris