Coverage Report

Created: 2026-07-02 14:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/task/index_builder.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/task/index_builder.h"
19
20
#include <mutex>
21
22
#include "common/logging.h"
23
#include "common/status.h"
24
#include "storage/index/index_file_reader.h"
25
#include "storage/index/index_file_writer.h"
26
#include "storage/index/inverted/inverted_index_desc.h"
27
#include "storage/index/inverted/inverted_index_fs_directory.h"
28
#include "storage/olap_define.h"
29
#include "storage/rowset/beta_rowset.h"
30
#include "storage/rowset/rowset_writer_context.h"
31
#include "storage/segment/segment_loader.h"
32
#include "storage/storage_engine.h"
33
#include "storage/tablet/tablet_schema.h"
34
#include "util/debug_points.h"
35
#include "util/trace.h"
36
37
namespace doris {
38
39
IndexBuilder::IndexBuilder(StorageEngine& engine, TabletSharedPtr tablet,
40
                           const std::vector<TColumn>& columns,
41
                           const std::vector<doris::TOlapTableIndex>& alter_inverted_indexes,
42
                           bool is_drop_op)
43
50
        : _engine(engine),
44
50
          _tablet(std::move(tablet)),
45
50
          _columns(columns),
46
50
          _alter_inverted_indexes(alter_inverted_indexes),
47
50
          _is_drop_op(is_drop_op) {
48
50
    _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
49
50
}
50
51
50
IndexBuilder::~IndexBuilder() {
52
50
    _olap_data_convertor.reset();
53
50
    _index_column_writers.clear();
54
50
}
55
56
50
Status IndexBuilder::init() {
57
54
    for (auto inverted_index : _alter_inverted_indexes) {
58
54
        _alter_index_ids.insert(inverted_index.index_id);
59
54
    }
60
50
    return Status::OK();
61
50
}
62
63
42
Status IndexBuilder::update_inverted_index_info() {
64
    // just do link files
65
42
    LOG(INFO) << "begin to update_inverted_index_info, tablet=" << _tablet->tablet_id()
66
42
              << ", is_drop_op=" << _is_drop_op;
67
    // index ids that will not be linked
68
42
    std::set<int64_t> without_index_uids;
69
42
    _output_rowsets.reserve(_input_rowsets.size());
70
42
    _pending_rs_guards.reserve(_input_rowsets.size());
71
44
    for (auto&& input_rowset : _input_rowsets) {
72
44
        bool is_local_rowset = input_rowset->is_local();
73
44
        DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset",
74
44
                        { is_local_rowset = false; })
75
44
        if (!is_local_rowset) [[unlikely]] {
76
            // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id();
77
0
            return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}",
78
0
                                         _tablet->tablet_id(),
79
0
                                         input_rowset->rowset_id().to_string());
80
0
        }
81
82
44
        TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>();
83
44
        const auto& input_rs_tablet_schema = input_rowset->tablet_schema();
84
44
        output_rs_tablet_schema->copy_from(*input_rs_tablet_schema);
85
44
        int64_t total_index_size = 0;
86
44
        auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get());
87
44
        auto size_st = beta_rowset->get_inverted_index_size(&total_index_size);
88
44
        DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", {
89
44
            size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed");
90
44
        })
91
44
        if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() &&
92
44
            !size_st.is<ErrorCode::NOT_FOUND>()) {
93
0
            return size_st;
94
0
        }
95
44
        auto num_segments = input_rowset->num_segments();
96
44
        size_t drop_index_size = 0;
97
98
44
        if (_is_drop_op) {
99
12
            for (const auto& t_inverted_index : _alter_inverted_indexes) {
100
12
                DCHECK_EQ(t_inverted_index.columns.size(), 1);
101
12
                auto column_name = t_inverted_index.columns[0];
102
12
                auto column_idx = output_rs_tablet_schema->field_index(column_name);
103
12
                if (column_idx < 0) {
104
0
                    if (!t_inverted_index.column_unique_ids.empty()) {
105
0
                        auto column_unique_id = t_inverted_index.column_unique_ids[0];
106
0
                        column_idx = output_rs_tablet_schema->field_index(column_unique_id);
107
0
                    }
108
0
                    if (column_idx < 0) {
109
0
                        LOG(WARNING) << "referenced column was missing. "
110
0
                                     << "[column=" << column_name
111
0
                                     << " referenced_column=" << column_idx << "]";
112
0
                        continue;
113
0
                    }
114
0
                }
115
12
                auto column = output_rs_tablet_schema->column(column_idx);
116
117
                // inverted index
118
12
                auto index_metas = output_rs_tablet_schema->inverted_indexs(column);
119
12
                for (const auto& index_meta : index_metas) {
120
                    // Only drop the index that matches the requested index_id,
121
                    // not all indexes on this column
122
12
                    if (index_meta->index_id() != t_inverted_index.index_id) {
123
2
                        continue;
124
2
                    }
125
10
                    if (output_rs_tablet_schema->get_inverted_index_storage_format() ==
126
10
                        InvertedIndexStorageFormatPB::V1) {
127
2
                        const auto& fs = io::global_local_filesystem();
128
129
4
                        for (int seg_id = 0; seg_id < num_segments; seg_id++) {
130
2
                            auto seg_path = local_segment_path(
131
2
                                    _tablet->tablet_path(), input_rowset->rowset_id().to_string(),
132
2
                                    seg_id);
133
2
                            auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
134
2
                                    InvertedIndexDescriptor::get_index_file_path_prefix(seg_path),
135
2
                                    index_meta->index_id(), index_meta->get_index_suffix());
136
2
                            int64_t index_size = 0;
137
2
                            RETURN_IF_ERROR(fs->file_size(index_path, &index_size));
138
2
                            VLOG_DEBUG << "inverted index file:" << index_path
139
0
                                       << " size:" << index_size;
140
2
                            drop_index_size += index_size;
141
2
                        }
142
2
                    }
143
10
                    _dropped_inverted_indexes.push_back(*index_meta);
144
                    // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
145
                    // remove dropped index_meta from output rowset tablet schema
146
10
                    output_rs_tablet_schema->remove_index(index_meta->index_id());
147
10
                }
148
149
                // ann index
150
12
                const auto* ann_index = output_rs_tablet_schema->ann_index(column);
151
12
                if (!ann_index) {
152
10
                    continue;
153
10
                }
154
                // Only drop the ann index that matches the requested index_id
155
2
                if (ann_index->index_id() != t_inverted_index.index_id) {
156
0
                    continue;
157
0
                }
158
2
                DCHECK(output_rs_tablet_schema->get_inverted_index_storage_format() !=
159
2
                       InvertedIndexStorageFormatPB::V1);
160
2
                _dropped_inverted_indexes.push_back(*ann_index);
161
                // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
162
                // remove dropped index_meta from output rowset tablet schema
163
2
                output_rs_tablet_schema->remove_index(ann_index->index_id());
164
2
            }
165
166
12
            DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", {
167
12
                auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
168
12
                        "index_builder.update_inverted_index_info.drop_index", "indexes_count", 0);
169
12
                if (indexes_count < 0) {
170
12
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
171
12
                            "indexes count cannot be negative");
172
12
                }
173
12
                auto indexes_size = output_rs_tablet_schema->inverted_indexes().size();
174
12
                if (indexes_count != indexes_size) {
175
12
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
176
12
                            "indexes count not equal to expected");
177
12
                }
178
12
            })
179
32
        } else {
180
            // base on input rowset's tablet_schema to build
181
            // output rowset's tablet_schema which only add
182
            // the indexes specified in this build index request
183
36
            for (auto t_inverted_index : _alter_inverted_indexes) {
184
36
                TabletIndex index;
185
36
                index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema);
186
36
                auto column_uid = index.col_unique_ids()[0];
187
36
                if (column_uid < 0) {
188
6
                    LOG(WARNING) << "referenced column was missing. "
189
6
                                 << "[column=" << t_inverted_index.columns[0]
190
6
                                 << " referenced_column=" << column_uid << "]";
191
6
                    continue;
192
6
                }
193
30
                const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid);
194
195
                // inverted index
196
30
                auto exist_indexs = output_rs_tablet_schema->inverted_indexs(col);
197
30
                for (const auto& exist_index : exist_indexs) {
198
0
                    if (exist_index->index_id() != index.index_id()) {
199
0
                        if (exist_index->is_same_except_id(&index)) {
200
0
                            LOG(WARNING) << fmt::format(
201
0
                                    "column: {} has a exist inverted index, but the index id not "
202
0
                                    "equal "
203
0
                                    "request's index id, , exist index id: {}, request's index id: "
204
0
                                    "{}, "
205
0
                                    "remove exist index in new output_rs_tablet_schema",
206
0
                                    column_uid, exist_index->index_id(), index.index_id());
207
0
                            without_index_uids.insert(exist_index->index_id());
208
0
                            output_rs_tablet_schema->remove_index(exist_index->index_id());
209
0
                        }
210
0
                    }
211
0
                }
212
213
                // ann index
214
30
                const auto* exist_index = output_rs_tablet_schema->ann_index(col);
215
30
                if (exist_index && exist_index->index_id() != index.index_id()) {
216
0
                    if (exist_index->is_same_except_id(&index)) {
217
0
                        LOG(WARNING) << fmt::format(
218
0
                                "column: {} has a exist ann index, but the index id not "
219
0
                                "equal request's index id, , exist index id: {}, request's index "
220
0
                                "id: {}, remove exist index in new output_rs_tablet_schema",
221
0
                                column_uid, exist_index->index_id(), index.index_id());
222
0
                        without_index_uids.insert(exist_index->index_id());
223
0
                        output_rs_tablet_schema->remove_index(exist_index->index_id());
224
0
                    }
225
0
                }
226
227
30
                output_rs_tablet_schema->append_index(std::move(index));
228
30
            }
229
32
        }
230
        // construct input rowset reader
231
44
        RowsetReaderSharedPtr input_rs_reader;
232
44
        RETURN_IF_ERROR(input_rowset->create_reader(&input_rs_reader));
233
        // construct output rowset writer
234
44
        RowsetWriterContext context;
235
44
        context.version = input_rs_reader->version();
236
44
        context.rowset_state = VISIBLE;
237
44
        context.segments_overlap = input_rowset->rowset_meta()->segments_overlap();
238
44
        context.tablet_schema = output_rs_tablet_schema;
239
44
        context.newest_write_timestamp = input_rs_reader->newest_write_timestamp();
240
44
        auto output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
241
44
        _pending_rs_guards.push_back(_engine.add_pending_rowset(context));
242
243
        // if without_index_uids is not empty, copy _alter_index_ids to it
244
        // else just use _alter_index_ids to avoid copy
245
44
        if (!without_index_uids.empty()) {
246
0
            without_index_uids.insert(_alter_index_ids.begin(), _alter_index_ids.end());
247
0
        }
248
249
        // build output rowset
250
44
        RETURN_IF_ERROR(input_rowset->link_files_to(
251
44
                _tablet->tablet_path(), output_rs_writer->rowset_id(), 0,
252
44
                without_index_uids.empty() ? &_alter_index_ids : &without_index_uids));
253
254
44
        auto input_rowset_meta = input_rowset->rowset_meta();
255
44
        RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
256
44
        rowset_meta->set_num_rows(input_rowset_meta->num_rows());
257
44
        if (output_rs_tablet_schema->get_inverted_index_storage_format() ==
258
44
            InvertedIndexStorageFormatPB::V1) {
259
8
            if (_is_drop_op) {
260
2
                VLOG_DEBUG << "data_disk_size:" << input_rowset_meta->data_disk_size()
261
0
                           << " total_disk_size:" << input_rowset_meta->total_disk_size()
262
0
                           << " index_disk_size:" << input_rowset_meta->index_disk_size()
263
0
                           << " drop_index_size:" << drop_index_size;
264
2
                rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() -
265
2
                                                 drop_index_size);
266
2
                rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
267
2
                rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() -
268
2
                                                 drop_index_size);
269
6
            } else {
270
6
                rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size());
271
6
                rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
272
6
                rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size());
273
6
            }
274
36
        } else {
275
78
            for (int seg_id = 0; seg_id < num_segments; seg_id++) {
276
42
                auto seg_path = DORIS_TRY(input_rowset->segment_path(seg_id));
277
42
                auto idx_file_reader = std::make_unique<IndexFileReader>(
278
42
                        context.fs(),
279
42
                        std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
280
42
                        output_rs_tablet_schema->get_inverted_index_storage_format(),
281
42
                        InvertedIndexFileInfo(), _tablet->tablet_id());
282
42
                auto st = idx_file_reader->init();
283
42
                DBUG_EXECUTE_IF(
284
42
                        "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", {
285
42
                            st = Status::Error<ErrorCode::INIT_FAILED>(
286
42
                                    "debug point: reader init error");
287
42
                        })
288
42
                if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
289
0
                    return st;
290
0
                }
291
42
                _index_file_readers.emplace(
292
42
                        std::make_pair(output_rs_writer->rowset_id().to_string(), seg_id),
293
42
                        std::move(idx_file_reader));
294
42
            }
295
36
            rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() -
296
36
                                             total_index_size);
297
36
            rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size());
298
36
            rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() -
299
36
                                             total_index_size);
300
36
        }
301
44
        rowset_meta->set_empty(input_rowset_meta->empty());
302
44
        rowset_meta->set_num_segments(input_rowset_meta->num_segments());
303
44
        rowset_meta->set_segments_overlap(input_rowset_meta->segments_overlap());
304
44
        rowset_meta->set_rowset_state(input_rowset_meta->rowset_state());
305
44
        std::vector<KeyBoundsPB> key_bounds;
306
44
        RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
307
44
        rowset_meta->set_segments_key_bounds_truncated(
308
44
                input_rowset_meta->is_segments_key_bounds_truncated());
309
        // preserve aggregated layout via the setter so the aggregated flag is not
310
        // clobbered by set_segments_key_bounds's default reset path.
311
44
        rowset_meta->set_segments_key_bounds(
312
44
                key_bounds, input_rowset_meta->is_segments_key_bounds_aggregated());
313
44
        std::vector<uint32_t> num_segment_rows;
314
44
        input_rowset_meta->get_num_segment_rows(&num_segment_rows);
315
44
        rowset_meta->set_num_segment_rows(num_segment_rows);
316
44
        auto output_rowset = output_rs_writer->manual_build(rowset_meta);
317
44
        if (input_rowset_meta->has_delete_predicate()) {
318
0
            output_rowset->rowset_meta()->set_delete_predicate(
319
0
                    input_rowset_meta->delete_predicate());
320
0
        }
321
44
        _output_rowsets.push_back(output_rowset);
322
44
    }
323
324
42
    return Status::OK();
325
42
}
326
327
Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta,
328
40
                                          std::vector<segment_v2::SegmentSharedPtr>& segments) {
329
40
    bool is_local_rowset = output_rowset_meta->is_local();
330
40
    DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset",
331
40
                    { is_local_rowset = false; })
332
40
    if (!is_local_rowset) [[unlikely]] {
333
        // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id();
334
0
        return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}",
335
0
                                     _tablet->tablet_id(),
336
0
                                     output_rowset_meta->rowset_id().to_string());
337
0
    }
338
339
40
    if (_is_drop_op) {
340
12
        const auto& output_rs_tablet_schema = output_rowset_meta->tablet_schema();
341
12
        if (output_rs_tablet_schema->get_inverted_index_storage_format() !=
342
12
            InvertedIndexStorageFormatPB::V1) {
343
10
            const auto& fs = output_rowset_meta->fs();
344
345
10
            const auto& output_rowset_schema = output_rowset_meta->tablet_schema();
346
10
            size_t inverted_index_size = 0;
347
10
            for (auto& seg_ptr : segments) {
348
10
                auto idx_file_reader_iter = _index_file_readers.find(
349
10
                        std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
350
10
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op",
351
10
                                { idx_file_reader_iter = _index_file_readers.end(); })
352
10
                if (idx_file_reader_iter == _index_file_readers.end()) {
353
0
                    LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":"
354
0
                               << seg_ptr->id() << " cannot be found";
355
0
                    continue;
356
0
                }
357
10
                auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories());
358
359
10
                std::string index_path_prefix {
360
10
                        InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path(
361
10
                                _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(),
362
10
                                seg_ptr->id()))};
363
364
10
                std::string index_path =
365
10
                        InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
366
10
                io::FileWriterPtr file_writer;
367
10
                Status st = fs->create_file(index_path, &file_writer);
368
10
                if (!st.ok()) {
369
0
                    LOG(WARNING) << "failed to create writable file. path=" << index_path
370
0
                                 << ", err: " << st;
371
0
                    return st;
372
0
                }
373
10
                auto index_file_writer = std::make_unique<IndexFileWriter>(
374
10
                        fs, std::move(index_path_prefix),
375
10
                        output_rowset_meta->rowset_id().to_string(), seg_ptr->id(),
376
10
                        output_rowset_schema->get_inverted_index_storage_format(),
377
10
                        std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id());
378
10
                RETURN_IF_ERROR(index_file_writer->initialize(dirs));
379
                // create inverted index writer
380
10
                for (auto& index_meta : _dropped_inverted_indexes) {
381
10
                    RETURN_IF_ERROR(index_file_writer->delete_index(&index_meta));
382
10
                }
383
10
                _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
384
10
            }
385
10
            for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
386
10
                auto st = index_file_writer->begin_close();
387
10
                if (!st.ok()) {
388
0
                    LOG(ERROR) << "close index_file_writer error:" << st;
389
0
                    return st;
390
0
                }
391
10
                inverted_index_size += index_file_writer->get_index_file_total_size();
392
10
            }
393
10
            for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
394
10
                auto st = index_file_writer->finish_close();
395
10
                if (!st.ok()) {
396
0
                    LOG(ERROR) << "wait close index_file_writer error:" << st;
397
0
                    return st;
398
0
                }
399
10
            }
400
10
            _index_file_writers.clear();
401
10
            output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
402
10
            output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
403
10
                                                    inverted_index_size);
404
10
            output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() +
405
10
                                                    inverted_index_size);
406
10
        }
407
12
        LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows();
408
12
        return Status::OK();
409
28
    } else {
410
        // create inverted or ann index writer
411
28
        const auto& fs = output_rowset_meta->fs();
412
28
        auto output_rowset_schema = output_rowset_meta->tablet_schema();
413
28
        size_t inverted_index_size = 0;
414
34
        for (auto& seg_ptr : segments) {
415
34
            std::string index_path_prefix {
416
34
                    InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path(
417
34
                            _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(),
418
34
                            seg_ptr->id()))};
419
34
            std::vector<ColumnId> return_columns;
420
34
            std::vector<std::pair<int64_t, int64_t>> inverted_index_writer_signs;
421
34
            _olap_data_convertor->reserve(_alter_inverted_indexes.size());
422
423
34
            std::unique_ptr<IndexFileWriter> index_file_writer = nullptr;
424
34
            if (output_rowset_schema->get_inverted_index_storage_format() >=
425
34
                InvertedIndexStorageFormatPB::V2) {
426
28
                auto idx_file_reader_iter = _index_file_readers.find(
427
28
                        std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
428
28
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader",
429
28
                                { idx_file_reader_iter = _index_file_readers.end(); })
430
28
                if (idx_file_reader_iter == _index_file_readers.end()) {
431
0
                    LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":"
432
0
                               << seg_ptr->id() << " cannot be found";
433
0
                    continue;
434
0
                }
435
28
                std::string index_path =
436
28
                        InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
437
28
                io::FileWriterPtr file_writer;
438
28
                Status st = fs->create_file(index_path, &file_writer);
439
28
                if (!st.ok()) {
440
0
                    LOG(WARNING) << "failed to create writable file. path=" << index_path
441
0
                                 << ", err: " << st;
442
0
                    return st;
443
0
                }
444
28
                auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories());
445
28
                index_file_writer = std::make_unique<IndexFileWriter>(
446
28
                        fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(),
447
28
                        seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(),
448
28
                        std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id());
449
28
                RETURN_IF_ERROR(index_file_writer->initialize(dirs));
450
28
            } else {
451
6
                index_file_writer = std::make_unique<IndexFileWriter>(
452
6
                        fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(),
453
6
                        seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(),
454
6
                        nullptr, true /* can_use_ram_dir */, _tablet->tablet_id());
455
6
            }
456
            // create inverted index writer, or ann index writer
457
42
            for (auto inverted_index : _alter_inverted_indexes) {
458
42
                DCHECK(inverted_index.index_type == TIndexType::INVERTED ||
459
42
                       inverted_index.index_type == TIndexType::ANN);
460
42
                DCHECK_EQ(inverted_index.columns.size(), 1);
461
42
                auto index_id = inverted_index.index_id;
462
42
                auto column_name = inverted_index.columns[0];
463
42
                auto column_idx = output_rowset_schema->field_index(column_name);
464
42
                if (column_idx < 0) {
465
8
                    if (inverted_index.__isset.column_unique_ids &&
466
8
                        !inverted_index.column_unique_ids.empty()) {
467
2
                        column_idx = output_rowset_schema->field_index(
468
2
                                inverted_index.column_unique_ids[0]);
469
2
                    }
470
8
                    if (column_idx < 0) {
471
6
                        LOG(WARNING) << "referenced column was missing. "
472
6
                                     << "[column=" << column_name
473
6
                                     << " referenced_column=" << column_idx << "]";
474
6
                        continue;
475
6
                    }
476
8
                }
477
36
                auto column = output_rowset_schema->column(column_idx);
478
                // variant column is not support for building index
479
36
                auto is_support_inverted_index =
480
36
                        IndexColumnWriter::check_support_inverted_index(column);
481
36
                auto is_support_ann_index = IndexColumnWriter::check_support_ann_index(column);
482
36
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index",
483
36
                                { is_support_inverted_index = false; })
484
36
                if (!is_support_inverted_index && !is_support_ann_index) {
485
0
                    continue;
486
0
                }
487
36
                DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id));
488
36
                _olap_data_convertor->add_column_data_convertor(column);
489
36
                return_columns.emplace_back(column_idx);
490
491
36
                if (inverted_index.index_type == TIndexType::INVERTED) {
492
                    // inverted index
493
34
                    auto index_metas = output_rowset_schema->inverted_indexs(column);
494
34
                    for (const auto& index_meta : index_metas) {
495
34
                        if (index_meta->index_id() != index_id) {
496
0
                            continue;
497
0
                        }
498
34
                        std::unique_ptr<segment_v2::IndexColumnWriter> inverted_index_builder;
499
34
                        try {
500
34
                            RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create(
501
34
                                    &column, &inverted_index_builder, index_file_writer.get(),
502
34
                                    index_meta));
503
34
                            DBUG_EXECUTE_IF(
504
34
                                    "IndexBuilder::handle_single_rowset_index_column_writer_create_"
505
34
                                    "error",
506
34
                                    {
507
34
                                        _CLTHROWA(CL_ERR_IO,
508
34
                                                  "debug point: "
509
34
                                                  "handle_single_rowset_index_column_writer_create_"
510
34
                                                  "error");
511
34
                                    })
512
34
                        } catch (const std::exception& e) {
513
0
                            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
514
0
                                    "CLuceneError occurred: {}", e.what());
515
0
                        }
516
517
34
                        if (inverted_index_builder) {
518
34
                            auto writer_sign = std::make_pair(seg_ptr->id(), index_id);
519
34
                            _index_column_writers.insert(
520
34
                                    std::make_pair(writer_sign, std::move(inverted_index_builder)));
521
34
                            inverted_index_writer_signs.emplace_back(writer_sign);
522
34
                        }
523
34
                    }
524
34
                } else if (inverted_index.index_type == TIndexType::ANN) {
525
                    // ann index
526
2
                    const auto* index_meta = output_rowset_schema->ann_index(column);
527
2
                    if (index_meta && index_meta->index_id() == index_id) {
528
2
                        std::unique_ptr<segment_v2::IndexColumnWriter> index_writer;
529
2
                        try {
530
2
                            RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create(
531
2
                                    &column, &index_writer, index_file_writer.get(), index_meta));
532
2
                            DBUG_EXECUTE_IF(
533
2
                                    "IndexBuilder::handle_single_rowset_index_column_writer_create_"
534
2
                                    "error",
535
2
                                    {
536
2
                                        _CLTHROWA(CL_ERR_IO,
537
2
                                                  "debug point: "
538
2
                                                  "handle_single_rowset_index_column_writer_create_"
539
2
                                                  "error");
540
2
                                    })
541
2
                        } catch (const std::exception& e) {
542
0
                            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
543
0
                                    "CLuceneError occurred: {}", e.what());
544
0
                        }
545
546
2
                        if (index_writer) {
547
2
                            auto writer_sign = std::make_pair(seg_ptr->id(), index_id);
548
2
                            _index_column_writers.insert(
549
2
                                    std::make_pair(writer_sign, std::move(index_writer)));
550
2
                            inverted_index_writer_signs.emplace_back(writer_sign);
551
2
                        }
552
2
                    }
553
2
                }
554
36
            }
555
556
            // DO NOT forget index_file_writer for the segment, otherwise, original inverted index will be deleted.
557
34
            _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer));
558
34
            if (return_columns.empty()) {
559
                // no columns to read
560
6
                continue;
561
6
            }
562
            // create iterator for each segment
563
28
            StorageReadOptions read_options;
564
28
            OlapReaderStatistics stats;
565
28
            read_options.stats = &stats;
566
28
            read_options.tablet_schema = output_rowset_schema;
567
28
            std::shared_ptr<Schema> schema =
568
28
                    std::make_shared<Schema>(output_rowset_schema->columns(), return_columns);
569
28
            std::unique_ptr<RowwiseIterator> iter;
570
28
            auto res = seg_ptr->new_iterator(schema, read_options, &iter);
571
28
            DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", {
572
28
                res = Status::Error<ErrorCode::INTERNAL_ERROR>(
573
28
                        "debug point: handle_single_rowset_create_iterator_error");
574
28
            })
575
28
            if (!res.ok()) {
576
0
                LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
577
0
                             << "]: " << res.to_string();
578
0
                return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string());
579
0
            }
580
581
28
            auto block = Block::create_unique(output_rowset_schema->create_block(return_columns));
582
56
            while (true) {
583
56
                auto status = iter->next_batch(block.get());
584
56
                DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", {
585
56
                    status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
586
56
                            "next_batch fault injection");
587
56
                });
588
56
                if (!status.ok()) {
589
28
                    if (status.is<ErrorCode::END_OF_FILE>()) {
590
28
                        break;
591
28
                    }
592
28
                    LOG(WARNING)
593
0
                            << "failed to read next block when schema change for inverted index."
594
0
                            << ", err=" << status.to_string();
595
0
                    return status;
596
28
                }
597
598
                // write inverted index data, or ann index data
599
28
                status = _write_inverted_index_data(output_rowset_schema, iter->data_id(),
600
28
                                                    block.get());
601
28
                DBUG_EXECUTE_IF(
602
28
                        "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", {
603
28
                            status = Status::Error<ErrorCode::INTERNAL_ERROR>(
604
28
                                    "debug point: "
605
28
                                    "handle_single_rowset_write_inverted_index_data_error");
606
28
                        })
607
28
                if (!status.ok()) {
608
0
                    return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
609
0
                            "failed to write block.");
610
0
                }
611
28
                block->clear_column_data();
612
28
            }
613
614
            // finish write inverted index, flush data to compound file
615
36
            for (auto& writer_sign : inverted_index_writer_signs) {
616
36
                try {
617
36
                    if (_index_column_writers[writer_sign]) {
618
36
                        RETURN_IF_ERROR(_index_column_writers[writer_sign]->finish());
619
36
                    }
620
36
                    DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", {
621
36
                        _CLTHROWA(CL_ERR_IO,
622
36
                                  "debug point: handle_single_rowset_index_build_finish_error");
623
36
                    })
624
36
                } catch (const std::exception& e) {
625
0
                    return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
626
0
                            "CLuceneError occurred: {}", e.what());
627
0
                }
628
36
            }
629
630
28
            _olap_data_convertor->reset();
631
28
        }
632
34
        for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
633
34
            auto st = index_file_writer->begin_close();
634
34
            DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", {
635
34
                st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
636
34
                        "debug point: handle_single_rowset_file_writer_close_error");
637
34
            })
638
34
            if (!st.ok()) {
639
0
                LOG(ERROR) << "close index_file_writer error:" << st;
640
0
                return st;
641
0
            }
642
34
            inverted_index_size += index_file_writer->get_index_file_total_size();
643
34
        }
644
34
        for (auto&& [seg_id, index_file_writer] : _index_file_writers) {
645
34
            auto st = index_file_writer->finish_close();
646
34
            if (!st.ok()) {
647
0
                LOG(ERROR) << "wait close index_file_writer error:" << st;
648
0
                return st;
649
0
            }
650
34
        }
651
28
        _index_column_writers.clear();
652
28
        _index_file_writers.clear();
653
28
        output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size());
654
28
        output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() +
655
28
                                                inverted_index_size);
656
28
        output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() +
657
28
                                                inverted_index_size);
658
28
        LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows();
659
28
    }
660
661
28
    return Status::OK();
662
40
}
663
664
Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, int64_t segment_idx,
665
28
                                                Block* block) {
666
28
    VLOG_DEBUG << "begin to write inverted/ann index";
667
    // converter block data
668
28
    _olap_data_convertor->set_source_content(block, 0, block->rows());
669
64
    for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) {
670
36
        auto inverted_index = _alter_inverted_indexes[i];
671
36
        auto index_id = inverted_index.index_id;
672
36
        auto column_name = inverted_index.columns[0];
673
36
        auto column_idx = tablet_schema->field_index(column_name);
674
36
        DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative",
675
36
                        { column_idx = -1; })
676
36
        if (column_idx < 0) {
677
2
            if (!inverted_index.column_unique_ids.empty()) {
678
2
                auto column_unique_id = inverted_index.column_unique_ids[0];
679
2
                column_idx = tablet_schema->field_index(column_unique_id);
680
2
            }
681
2
            if (column_idx < 0) {
682
0
                LOG(WARNING) << "referenced column was missing. "
683
0
                             << "[column=" << column_name << " referenced_column=" << column_idx
684
0
                             << "]";
685
0
                continue;
686
0
            }
687
2
        }
688
36
        const auto& column = tablet_schema->column(column_idx);
689
36
        auto writer_sign = std::make_pair(segment_idx, index_id);
690
36
        auto converted_result = _olap_data_convertor->convert_column_data(i);
691
36
        DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", {
692
36
            converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>(
693
36
                    "debug point: _write_inverted_index_data_convert_column_data_error");
694
36
        })
695
36
        if (converted_result.first != Status::OK()) {
696
0
            LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first;
697
0
            return converted_result.first;
698
0
        }
699
36
        const auto* ptr = (const uint8_t*)converted_result.second->get_data();
700
36
        const auto* null_map = converted_result.second->get_nullmap();
701
36
        if (null_map) {
702
0
            RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, &column, null_map, &ptr,
703
0
                                          block->rows()));
704
36
        } else {
705
36
            RETURN_IF_ERROR(_add_data(column_name, writer_sign, &column, &ptr, block->rows()));
706
36
        }
707
36
    }
708
28
    _olap_data_convertor->clear_source_content();
709
710
28
    return Status::OK();
711
28
}
712
713
Status IndexBuilder::_add_nullable(const std::string& column_name,
714
                                   const std::pair<int64_t, int64_t>& index_writer_sign,
715
                                   const TabletColumn* column, const uint8_t* null_map,
716
0
                                   const uint8_t** ptr, size_t num_rows) {
717
    // TODO: need to process null data for inverted index
718
0
    if (column->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
719
0
        DCHECK(column->get_subtype_count() == 1);
720
        // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
721
0
        const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
722
        // total number length
723
0
        auto offset_data = *(data_ptr + 1);
724
0
        const auto* offsets_ptr = (const uint8_t*)offset_data;
725
0
        try {
726
0
            auto data = *(data_ptr + 2);
727
0
            auto nested_null_map = *(data_ptr + 3);
728
0
            RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values(
729
0
                    field_type_size(column->get_sub_column(0).type()),
730
0
                    reinterpret_cast<const void*>(data),
731
0
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
732
0
            DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", {
733
0
                _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error");
734
0
            })
735
0
            RETURN_IF_ERROR(
736
0
                    _index_column_writers[index_writer_sign]->add_array_nulls(null_map, num_rows));
737
0
        } catch (const std::exception& e) {
738
0
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
739
0
                    "CLuceneError occurred: {}", e.what());
740
0
        }
741
742
0
        return Status::OK();
743
0
    }
744
0
    size_t offset = 0;
745
0
    auto next_run_step = [&]() {
746
0
        size_t step = 1;
747
0
        for (auto i = offset + 1; i < num_rows; ++i) {
748
0
            if (null_map[offset] == null_map[i]) {
749
0
                step++;
750
0
            } else {
751
0
                break;
752
0
            }
753
0
        }
754
0
        return step;
755
0
    };
756
0
    try {
757
0
        do {
758
0
            auto step = next_run_step();
759
0
            if (null_map[offset]) {
760
0
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_nulls(
761
0
                        static_cast<uint32_t>(step)));
762
0
            } else {
763
0
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name,
764
0
                                                                                     *ptr, step));
765
0
            }
766
0
            *ptr += field_type_size(column->type()) * step;
767
0
            offset += step;
768
0
            DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception",
769
0
                            { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); })
770
0
        } while (offset < num_rows);
771
0
    } catch (const std::exception& e) {
772
0
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occurred: {}",
773
0
                                                                      e.what());
774
0
    }
775
776
0
    return Status::OK();
777
0
}
778
779
Status IndexBuilder::_add_data(const std::string& column_name,
780
                               const std::pair<int64_t, int64_t>& index_writer_sign,
781
36
                               const TabletColumn* column, const uint8_t** ptr, size_t num_rows) {
782
36
    try {
783
36
        if (column->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
784
4
            DCHECK(column->get_subtype_count() == 1);
785
            // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
786
4
            const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
787
            // total number length
788
4
            auto element_cnt = size_t((unsigned long)(*data_ptr));
789
4
            auto offset_data = *(data_ptr + 1);
790
4
            const auto* offsets_ptr = (const uint8_t*)offset_data;
791
4
            if (element_cnt > 0) {
792
4
                auto data = *(data_ptr + 2);
793
4
                auto nested_null_map = *(data_ptr + 3);
794
4
                RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values(
795
4
                        field_type_size(column->get_sub_column(0).type()),
796
4
                        reinterpret_cast<const void*>(data),
797
4
                        reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
798
4
            }
799
32
        } else {
800
32
            RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, *ptr,
801
32
                                                                                 num_rows));
802
32
        }
803
36
        DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception",
804
36
                        { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); })
805
36
    } catch (const std::exception& e) {
806
0
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occurred: {}",
807
0
                                                                      e.what());
808
0
    }
809
810
36
    return Status::OK();
811
36
}
812
813
40
Status IndexBuilder::handle_inverted_index_data() {
814
40
    LOG(INFO) << "begin to handle_inverted_index_data";
815
40
    DCHECK(_input_rowsets.size() == _output_rowsets.size());
816
42
    for (auto& _output_rowset : _output_rowsets) {
817
42
        SegmentCacheHandle segment_cache_handle;
818
42
        RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
819
42
                std::static_pointer_cast<BetaRowset>(_output_rowset), &segment_cache_handle));
820
42
        auto output_rowset_meta = _output_rowset->rowset_meta();
821
42
        auto& segments = segment_cache_handle.get_segments();
822
42
        RETURN_IF_ERROR(handle_single_rowset(output_rowset_meta, segments));
823
42
    }
824
38
    return Status::OK();
825
40
}
826
827
48
Status IndexBuilder::do_build_inverted_index() {
828
48
    LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id()
829
48
              << ", is_drop_op=" << _is_drop_op;
830
48
    DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty",
831
48
                    { _alter_inverted_indexes.clear(); })
832
48
    if (_alter_inverted_indexes.empty()) {
833
0
        return Status::OK();
834
0
    }
835
836
48
    static constexpr long TRY_LOCK_TIMEOUT = 30;
837
48
    std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::defer_lock);
838
48
    bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));
839
840
48
    if (!owns_lock) {
841
0
        return Status::ObtainLockFailed(
842
0
                "try schema_change_lock failed. There might be schema change or cooldown running "
843
0
                "on "
844
0
                "tablet={} ",
845
0
                _tablet->tablet_id());
846
0
    }
847
    // Check executing serially with compaction task.
848
48
    std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
849
48
                                                      std::try_to_lock);
850
48
    if (!base_compaction_lock.owns_lock()) {
851
0
        return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ",
852
0
                                        _tablet->tablet_id());
853
0
    }
854
48
    std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
855
48
                                                      std::try_to_lock);
856
48
    if (!cumu_compaction_lock.owns_lock()) {
857
0
        return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}",
858
0
                                        _tablet->tablet_id());
859
0
    }
860
861
48
    std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
862
48
                                                      std::try_to_lock);
863
48
    if (!cold_compaction_lock.owns_lock()) {
864
0
        return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}",
865
0
                                        _tablet->tablet_id());
866
0
    }
867
868
48
    std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
869
48
                                                           std::try_to_lock);
870
48
    if (!build_inverted_index_lock.owns_lock()) {
871
0
        return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}",
872
0
                                        _tablet->tablet_id());
873
0
    }
874
875
48
    std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
876
48
    if (!migration_rlock.owns_lock()) {
877
0
        return Status::ObtainLockFailed("got migration_rlock failed. tablet={}",
878
0
                                        _tablet->tablet_id());
879
0
    }
880
881
48
    DCHECK(!_alter_index_ids.empty());
882
48
    _input_rowsets =
883
48
            _tablet->pick_candidate_rowsets_to_build_inverted_index(_alter_index_ids, _is_drop_op);
884
48
    if (_input_rowsets.empty()) {
885
2
        LOG(INFO) << "_input_rowsets is empty";
886
2
        return Status::OK();
887
2
    }
888
889
46
    auto st = update_inverted_index_info();
890
46
    if (!st.ok()) {
891
6
        LOG(WARNING) << "failed to update_inverted_index_info. "
892
6
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
893
6
        gc_output_rowset();
894
6
        return st;
895
6
    }
896
897
    // create inverted index file for output rowset
898
40
    st = handle_inverted_index_data();
899
40
    if (!st.ok()) {
900
2
        LOG(WARNING) << "failed to handle_inverted_index_data. "
901
2
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
902
2
        gc_output_rowset();
903
2
        return st;
904
2
    }
905
906
    // modify rowsets in memory
907
38
    st = modify_rowsets();
908
38
    DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", {
909
38
        st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>(
910
38
                "debug point: do_build_inverted_index_modify_rowsets_status_error");
911
38
    })
912
38
    if (!st.ok()) {
913
0
        LOG(WARNING) << "failed to modify rowsets in memory. "
914
0
                     << "tablet=" << _tablet->tablet_id() << ", error=" << st;
915
0
        gc_output_rowset();
916
0
        return st;
917
0
    }
918
38
    return Status::OK();
919
38
}
920
921
38
Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) {
922
38
    DCHECK(std::ranges::all_of(
923
38
            _output_rowsets.begin(), _output_rowsets.end(), [&engine = _engine](auto&& rs) {
924
38
                if (engine.check_rowset_id_in_unused_rowsets(rs->rowset_id())) {
925
38
                    LOG(ERROR) << "output rowset: " << rs->rowset_id() << " in unused rowsets";
926
38
                    return false;
927
38
                }
928
38
                return true;
929
38
            }));
930
931
38
    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
932
38
        _tablet->enable_unique_key_merge_on_write()) {
933
2
        std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock());
934
2
        std::lock_guard meta_wlock(_tablet->get_header_lock());
935
2
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
936
2
        DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id());
937
4
        for (auto i = 0; i < _input_rowsets.size(); ++i) {
938
2
            RowsetId input_rowset_id = _input_rowsets[i]->rowset_id();
939
2
            RowsetId output_rowset_id = _output_rowsets[i]->rowset_id();
940
2
            for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) {
941
0
                RowsetId rs_id = std::get<0>(k);
942
0
                if (rs_id == input_rowset_id) {
943
0
                    DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k),
944
0
                                                             std::get<2>(k)};
945
0
                    auto res = delete_bitmap->set(output_rs_key, v);
946
0
                    DCHECK(res > 0) << "delete_bitmap set failed, res=" << res;
947
0
                }
948
0
            }
949
2
        }
950
2
        _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
951
952
        // modify_rowsets will remove the delete_bitmap for input rowsets,
953
        // should call it after merge delete_bitmap
954
2
        RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true));
955
36
    } else {
956
36
        std::lock_guard wrlock(_tablet->get_header_lock());
957
36
        RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true));
958
36
    }
959
960
#ifndef BE_TEST
961
    {
962
        std::shared_lock rlock(_tablet->get_header_lock());
963
        _tablet->save_meta();
964
    }
965
#endif
966
38
    return Status::OK();
967
38
}
968
969
8
void IndexBuilder::gc_output_rowset() {
970
8
    for (auto&& output_rowset : _output_rowsets) {
971
4
        auto is_local_rowset = output_rowset->is_local();
972
4
        DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset",
973
4
                        { is_local_rowset = false; })
974
4
        if (!is_local_rowset) {
975
0
            _tablet->record_unused_remote_rowset(output_rowset->rowset_id(),
976
0
                                                 output_rowset->rowset_meta()->resource_id(),
977
0
                                                 output_rowset->num_segments());
978
0
            return;
979
0
        }
980
4
        _engine.add_unused_rowset(output_rowset);
981
4
    }
982
8
}
983
984
} // namespace doris