be/src/storage/task/index_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/task/index_builder.h" |
19 | | |
20 | | #include <mutex> |
21 | | |
22 | | #include "common/logging.h" |
23 | | #include "common/status.h" |
24 | | #include "storage/field.h" |
25 | | #include "storage/index/index_file_reader.h" |
26 | | #include "storage/index/index_file_writer.h" |
27 | | #include "storage/index/inverted/inverted_index_desc.h" |
28 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
29 | | #include "storage/olap_define.h" |
30 | | #include "storage/rowset/beta_rowset.h" |
31 | | #include "storage/rowset/rowset_writer_context.h" |
32 | | #include "storage/segment/segment_loader.h" |
33 | | #include "storage/storage_engine.h" |
34 | | #include "storage/tablet/tablet_schema.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/trace.h" |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | |
41 | | IndexBuilder::IndexBuilder(StorageEngine& engine, TabletSharedPtr tablet, |
42 | | const std::vector<TColumn>& columns, |
43 | | const std::vector<doris::TOlapTableIndex>& alter_inverted_indexes, |
44 | | bool is_drop_op) |
45 | 22 | : _engine(engine), |
46 | 22 | _tablet(std::move(tablet)), |
47 | 22 | _columns(columns), |
48 | 22 | _alter_inverted_indexes(alter_inverted_indexes), |
49 | 22 | _is_drop_op(is_drop_op) { |
50 | 22 | _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>(); |
51 | 22 | } |
52 | | |
53 | 22 | IndexBuilder::~IndexBuilder() { |
54 | 22 | _olap_data_convertor.reset(); |
55 | 22 | _index_column_writers.clear(); |
56 | 22 | } |
57 | | |
58 | 22 | Status IndexBuilder::init() { |
59 | 24 | for (auto inverted_index : _alter_inverted_indexes) { |
60 | 24 | _alter_index_ids.insert(inverted_index.index_id); |
61 | 24 | } |
62 | 22 | return Status::OK(); |
63 | 22 | } |
64 | | |
65 | 18 | Status IndexBuilder::update_inverted_index_info() { |
66 | | // just do link files |
67 | 18 | LOG(INFO) << "begin to update_inverted_index_info, tablet=" << _tablet->tablet_id() |
68 | 18 | << ", is_drop_op=" << _is_drop_op; |
69 | | // index ids that will not be linked |
70 | 18 | std::set<int64_t> without_index_uids; |
71 | 18 | _output_rowsets.reserve(_input_rowsets.size()); |
72 | 18 | _pending_rs_guards.reserve(_input_rowsets.size()); |
73 | 18 | for (auto&& input_rowset : _input_rowsets) { |
74 | 18 | bool is_local_rowset = input_rowset->is_local(); |
75 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset", |
76 | 18 | { is_local_rowset = false; }) |
77 | 18 | if (!is_local_rowset) [[unlikely]] { |
78 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id(); |
79 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
80 | 0 | _tablet->tablet_id(), |
81 | 0 | input_rowset->rowset_id().to_string()); |
82 | 0 | } |
83 | | |
84 | 18 | TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>(); |
85 | 18 | const auto& input_rs_tablet_schema = input_rowset->tablet_schema(); |
86 | 18 | output_rs_tablet_schema->copy_from(*input_rs_tablet_schema); |
87 | 18 | int64_t total_index_size = 0; |
88 | 18 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); |
89 | 18 | auto size_st = beta_rowset->get_inverted_index_size(&total_index_size); |
90 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", { |
91 | 18 | size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed"); |
92 | 18 | }) |
93 | 18 | if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() && |
94 | 18 | !size_st.is<ErrorCode::NOT_FOUND>()) { |
95 | 0 | return size_st; |
96 | 0 | } |
97 | 18 | auto num_segments = input_rowset->num_segments(); |
98 | 18 | size_t drop_index_size = 0; |
99 | | |
100 | 18 | if (_is_drop_op) { |
101 | 4 | for (const auto& t_inverted_index : _alter_inverted_indexes) { |
102 | 4 | DCHECK_EQ(t_inverted_index.columns.size(), 1); |
103 | 4 | auto column_name = t_inverted_index.columns[0]; |
104 | 4 | auto column_idx = output_rs_tablet_schema->field_index(column_name); |
105 | 4 | if (column_idx < 0) { |
106 | 0 | if (!t_inverted_index.column_unique_ids.empty()) { |
107 | 0 | auto column_unique_id = t_inverted_index.column_unique_ids[0]; |
108 | 0 | column_idx = output_rs_tablet_schema->field_index(column_unique_id); |
109 | 0 | } |
110 | 0 | if (column_idx < 0) { |
111 | 0 | LOG(WARNING) << "referenced column was missing. " |
112 | 0 | << "[column=" << column_name |
113 | 0 | << " referenced_column=" << column_idx << "]"; |
114 | 0 | continue; |
115 | 0 | } |
116 | 0 | } |
117 | 4 | auto column = output_rs_tablet_schema->column(column_idx); |
118 | | |
119 | | // inverted index |
120 | 4 | auto index_metas = output_rs_tablet_schema->inverted_indexs(column); |
121 | 4 | for (const auto& index_meta : index_metas) { |
122 | | // Only drop the index that matches the requested index_id, |
123 | | // not all indexes on this column |
124 | 4 | if (index_meta->index_id() != t_inverted_index.index_id) { |
125 | 1 | continue; |
126 | 1 | } |
127 | 3 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
128 | 3 | InvertedIndexStorageFormatPB::V1) { |
129 | 1 | const auto& fs = io::global_local_filesystem(); |
130 | | |
131 | 2 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
132 | 1 | auto seg_path = local_segment_path( |
133 | 1 | _tablet->tablet_path(), input_rowset->rowset_id().to_string(), |
134 | 1 | seg_id); |
135 | 1 | auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( |
136 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
137 | 1 | index_meta->index_id(), index_meta->get_index_suffix()); |
138 | 1 | int64_t index_size = 0; |
139 | 1 | RETURN_IF_ERROR(fs->file_size(index_path, &index_size)); |
140 | 1 | VLOG_DEBUG << "inverted index file:" << index_path |
141 | 0 | << " size:" << index_size; |
142 | 1 | drop_index_size += index_size; |
143 | 1 | } |
144 | 1 | } |
145 | 3 | _dropped_inverted_indexes.push_back(*index_meta); |
146 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
147 | | // remove dropped index_meta from output rowset tablet schema |
148 | 3 | output_rs_tablet_schema->remove_index(index_meta->index_id()); |
149 | 3 | } |
150 | | |
151 | | // ann index |
152 | 4 | const auto* ann_index = output_rs_tablet_schema->ann_index(column); |
153 | 4 | if (!ann_index) { |
154 | 3 | continue; |
155 | 3 | } |
156 | | // Only drop the ann index that matches the requested index_id |
157 | 1 | if (ann_index->index_id() != t_inverted_index.index_id) { |
158 | 0 | continue; |
159 | 0 | } |
160 | 1 | DCHECK(output_rs_tablet_schema->get_inverted_index_storage_format() != |
161 | 1 | InvertedIndexStorageFormatPB::V1); |
162 | 1 | _dropped_inverted_indexes.push_back(*ann_index); |
163 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
164 | | // remove dropped index_meta from output rowset tablet schema |
165 | 1 | output_rs_tablet_schema->remove_index(ann_index->index_id()); |
166 | 1 | } |
167 | | |
168 | 4 | DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", { |
169 | 4 | auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
170 | 4 | "index_builder.update_inverted_index_info.drop_index", "indexes_count", 0); |
171 | 4 | if (indexes_count < 0) { |
172 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
173 | 4 | "indexes count cannot be negative"); |
174 | 4 | } |
175 | 4 | auto indexes_size = output_rs_tablet_schema->inverted_indexes().size(); |
176 | 4 | if (indexes_count != indexes_size) { |
177 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
178 | 4 | "indexes count not equal to expected"); |
179 | 4 | } |
180 | 4 | }) |
181 | 14 | } else { |
182 | | // base on input rowset's tablet_schema to build |
183 | | // output rowset's tablet_schema which only add |
184 | | // the indexes specified in this build index request |
185 | 16 | for (auto t_inverted_index : _alter_inverted_indexes) { |
186 | 16 | TabletIndex index; |
187 | 16 | index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema); |
188 | 16 | auto column_uid = index.col_unique_ids()[0]; |
189 | 16 | if (column_uid < 0) { |
190 | 3 | LOG(WARNING) << "referenced column was missing. " |
191 | 3 | << "[column=" << t_inverted_index.columns[0] |
192 | 3 | << " referenced_column=" << column_uid << "]"; |
193 | 3 | continue; |
194 | 3 | } |
195 | 13 | const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid); |
196 | | |
197 | | // inverted index |
198 | 13 | auto exist_indexs = output_rs_tablet_schema->inverted_indexs(col); |
199 | 13 | for (const auto& exist_index : exist_indexs) { |
200 | 0 | if (exist_index->index_id() != index.index_id()) { |
201 | 0 | if (exist_index->is_same_except_id(&index)) { |
202 | 0 | LOG(WARNING) << fmt::format( |
203 | 0 | "column: {} has a exist inverted index, but the index id not " |
204 | 0 | "equal " |
205 | 0 | "request's index id, , exist index id: {}, request's index id: " |
206 | 0 | "{}, " |
207 | 0 | "remove exist index in new output_rs_tablet_schema", |
208 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
209 | 0 | without_index_uids.insert(exist_index->index_id()); |
210 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
211 | 0 | } |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | // ann index |
216 | 13 | const auto* exist_index = output_rs_tablet_schema->ann_index(col); |
217 | 13 | if (exist_index && exist_index->index_id() != index.index_id()) { |
218 | 0 | if (exist_index->is_same_except_id(&index)) { |
219 | 0 | LOG(WARNING) << fmt::format( |
220 | 0 | "column: {} has a exist ann index, but the index id not " |
221 | 0 | "equal request's index id, , exist index id: {}, request's index " |
222 | 0 | "id: {}, remove exist index in new output_rs_tablet_schema", |
223 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
224 | 0 | without_index_uids.insert(exist_index->index_id()); |
225 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
226 | 0 | } |
227 | 0 | } |
228 | | |
229 | 13 | output_rs_tablet_schema->append_index(std::move(index)); |
230 | 13 | } |
231 | 14 | } |
232 | | // construct input rowset reader |
233 | 18 | RowsetReaderSharedPtr input_rs_reader; |
234 | 18 | RETURN_IF_ERROR(input_rowset->create_reader(&input_rs_reader)); |
235 | | // construct output rowset writer |
236 | 18 | RowsetWriterContext context; |
237 | 18 | context.version = input_rs_reader->version(); |
238 | 18 | context.rowset_state = VISIBLE; |
239 | 18 | context.segments_overlap = input_rowset->rowset_meta()->segments_overlap(); |
240 | 18 | context.tablet_schema = output_rs_tablet_schema; |
241 | 18 | context.newest_write_timestamp = input_rs_reader->newest_write_timestamp(); |
242 | 18 | auto output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); |
243 | 18 | _pending_rs_guards.push_back(_engine.add_pending_rowset(context)); |
244 | | |
245 | | // if without_index_uids is not empty, copy _alter_index_ids to it |
246 | | // else just use _alter_index_ids to avoid copy |
247 | 18 | if (!without_index_uids.empty()) { |
248 | 0 | without_index_uids.insert(_alter_index_ids.begin(), _alter_index_ids.end()); |
249 | 0 | } |
250 | | |
251 | | // build output rowset |
252 | 18 | RETURN_IF_ERROR(input_rowset->link_files_to( |
253 | 18 | _tablet->tablet_path(), output_rs_writer->rowset_id(), 0, |
254 | 18 | without_index_uids.empty() ? &_alter_index_ids : &without_index_uids)); |
255 | | |
256 | 18 | auto input_rowset_meta = input_rowset->rowset_meta(); |
257 | 18 | RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>(); |
258 | 18 | rowset_meta->set_num_rows(input_rowset_meta->num_rows()); |
259 | 18 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
260 | 18 | InvertedIndexStorageFormatPB::V1) { |
261 | 4 | if (_is_drop_op) { |
262 | 1 | VLOG_DEBUG << "data_disk_size:" << input_rowset_meta->data_disk_size() |
263 | 0 | << " total_disk_size:" << input_rowset_meta->total_disk_size() |
264 | 0 | << " index_disk_size:" << input_rowset_meta->index_disk_size() |
265 | 0 | << " drop_index_size:" << drop_index_size; |
266 | 1 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
267 | 1 | drop_index_size); |
268 | 1 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
269 | 1 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
270 | 1 | drop_index_size); |
271 | 3 | } else { |
272 | 3 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size()); |
273 | 3 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
274 | 3 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size()); |
275 | 3 | } |
276 | 14 | } else { |
277 | 31 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
278 | 17 | auto seg_path = DORIS_TRY(input_rowset->segment_path(seg_id)); |
279 | 17 | auto idx_file_reader = std::make_unique<IndexFileReader>( |
280 | 17 | context.fs(), |
281 | 17 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, |
282 | 17 | output_rs_tablet_schema->get_inverted_index_storage_format(), |
283 | 17 | InvertedIndexFileInfo(), _tablet->tablet_id()); |
284 | 17 | auto st = idx_file_reader->init(); |
285 | 17 | DBUG_EXECUTE_IF( |
286 | 17 | "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", { |
287 | 17 | st = Status::Error<ErrorCode::INIT_FAILED>( |
288 | 17 | "debug point: reader init error"); |
289 | 17 | }) |
290 | 17 | if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) { |
291 | 0 | return st; |
292 | 0 | } |
293 | 17 | _index_file_readers.emplace( |
294 | 17 | std::make_pair(output_rs_writer->rowset_id().to_string(), seg_id), |
295 | 17 | std::move(idx_file_reader)); |
296 | 17 | } |
297 | 14 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
298 | 14 | total_index_size); |
299 | 14 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
300 | 14 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
301 | 14 | total_index_size); |
302 | 14 | } |
303 | 18 | rowset_meta->set_empty(input_rowset_meta->empty()); |
304 | 18 | rowset_meta->set_num_segments(input_rowset_meta->num_segments()); |
305 | 18 | rowset_meta->set_segments_overlap(input_rowset_meta->segments_overlap()); |
306 | 18 | rowset_meta->set_rowset_state(input_rowset_meta->rowset_state()); |
307 | 18 | std::vector<KeyBoundsPB> key_bounds; |
308 | 18 | RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds)); |
309 | 18 | rowset_meta->set_segments_key_bounds_truncated( |
310 | 18 | input_rowset_meta->is_segments_key_bounds_truncated()); |
311 | 18 | rowset_meta->set_segments_key_bounds(key_bounds); |
312 | 18 | std::vector<uint32_t> num_segment_rows; |
313 | 18 | input_rowset_meta->get_num_segment_rows(&num_segment_rows); |
314 | 18 | rowset_meta->set_num_segment_rows(num_segment_rows); |
315 | 18 | auto output_rowset = output_rs_writer->manual_build(rowset_meta); |
316 | 18 | if (input_rowset_meta->has_delete_predicate()) { |
317 | 0 | output_rowset->rowset_meta()->set_delete_predicate( |
318 | 0 | input_rowset_meta->delete_predicate()); |
319 | 0 | } |
320 | 18 | _output_rowsets.push_back(output_rowset); |
321 | 18 | } |
322 | | |
323 | 18 | return Status::OK(); |
324 | 18 | } |
325 | | |
326 | | Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta, |
327 | 16 | std::vector<segment_v2::SegmentSharedPtr>& segments) { |
328 | 16 | bool is_local_rowset = output_rowset_meta->is_local(); |
329 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset", |
330 | 16 | { is_local_rowset = false; }) |
331 | 16 | if (!is_local_rowset) [[unlikely]] { |
332 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id(); |
333 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
334 | 0 | _tablet->tablet_id(), |
335 | 0 | output_rowset_meta->rowset_id().to_string()); |
336 | 0 | } |
337 | | |
338 | 16 | if (_is_drop_op) { |
339 | 4 | const auto& output_rs_tablet_schema = output_rowset_meta->tablet_schema(); |
340 | 4 | if (output_rs_tablet_schema->get_inverted_index_storage_format() != |
341 | 4 | InvertedIndexStorageFormatPB::V1) { |
342 | 3 | const auto& fs = output_rowset_meta->fs(); |
343 | | |
344 | 3 | const auto& output_rowset_schema = output_rowset_meta->tablet_schema(); |
345 | 3 | size_t inverted_index_size = 0; |
346 | 3 | for (auto& seg_ptr : segments) { |
347 | 3 | auto idx_file_reader_iter = _index_file_readers.find( |
348 | 3 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
349 | 3 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op", |
350 | 3 | { idx_file_reader_iter = _index_file_readers.end(); }) |
351 | 3 | if (idx_file_reader_iter == _index_file_readers.end()) { |
352 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
353 | 0 | << seg_ptr->id() << " cannot be found"; |
354 | 0 | continue; |
355 | 0 | } |
356 | 3 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
357 | | |
358 | 3 | std::string index_path_prefix { |
359 | 3 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
360 | 3 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
361 | 3 | seg_ptr->id()))}; |
362 | | |
363 | 3 | std::string index_path = |
364 | 3 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
365 | 3 | io::FileWriterPtr file_writer; |
366 | 3 | Status st = fs->create_file(index_path, &file_writer); |
367 | 3 | if (!st.ok()) { |
368 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
369 | 0 | << ", err: " << st; |
370 | 0 | return st; |
371 | 0 | } |
372 | 3 | auto index_file_writer = std::make_unique<IndexFileWriter>( |
373 | 3 | fs, std::move(index_path_prefix), |
374 | 3 | output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), |
375 | 3 | output_rowset_schema->get_inverted_index_storage_format(), |
376 | 3 | std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); |
377 | 3 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
378 | | // create inverted index writer |
379 | 3 | for (auto& index_meta : _dropped_inverted_indexes) { |
380 | 3 | RETURN_IF_ERROR(index_file_writer->delete_index(&index_meta)); |
381 | 3 | } |
382 | 3 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
383 | 3 | } |
384 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
385 | 3 | auto st = index_file_writer->begin_close(); |
386 | 3 | if (!st.ok()) { |
387 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
388 | 0 | return st; |
389 | 0 | } |
390 | 3 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
391 | 3 | } |
392 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
393 | 3 | auto st = index_file_writer->finish_close(); |
394 | 3 | if (!st.ok()) { |
395 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
396 | 0 | return st; |
397 | 0 | } |
398 | 3 | } |
399 | 3 | _index_file_writers.clear(); |
400 | 3 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
401 | 3 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
402 | 3 | inverted_index_size); |
403 | 3 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
404 | 3 | inverted_index_size); |
405 | 3 | } |
406 | 4 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
407 | 4 | return Status::OK(); |
408 | 12 | } else { |
409 | | // create inverted or ann index writer |
410 | 12 | const auto& fs = output_rowset_meta->fs(); |
411 | 12 | auto output_rowset_schema = output_rowset_meta->tablet_schema(); |
412 | 12 | size_t inverted_index_size = 0; |
413 | 15 | for (auto& seg_ptr : segments) { |
414 | 15 | std::string index_path_prefix { |
415 | 15 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
416 | 15 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
417 | 15 | seg_ptr->id()))}; |
418 | 15 | std::vector<ColumnId> return_columns; |
419 | 15 | std::vector<std::pair<int64_t, int64_t>> inverted_index_writer_signs; |
420 | 15 | _olap_data_convertor->reserve(_alter_inverted_indexes.size()); |
421 | | |
422 | 15 | std::unique_ptr<IndexFileWriter> index_file_writer = nullptr; |
423 | 15 | if (output_rowset_schema->get_inverted_index_storage_format() >= |
424 | 15 | InvertedIndexStorageFormatPB::V2) { |
425 | 12 | auto idx_file_reader_iter = _index_file_readers.find( |
426 | 12 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
427 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader", |
428 | 12 | { idx_file_reader_iter = _index_file_readers.end(); }) |
429 | 12 | if (idx_file_reader_iter == _index_file_readers.end()) { |
430 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
431 | 0 | << seg_ptr->id() << " cannot be found"; |
432 | 0 | continue; |
433 | 0 | } |
434 | 12 | std::string index_path = |
435 | 12 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
436 | 12 | io::FileWriterPtr file_writer; |
437 | 12 | Status st = fs->create_file(index_path, &file_writer); |
438 | 12 | if (!st.ok()) { |
439 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
440 | 0 | << ", err: " << st; |
441 | 0 | return st; |
442 | 0 | } |
443 | 12 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
444 | 12 | index_file_writer = std::make_unique<IndexFileWriter>( |
445 | 12 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
446 | 12 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), |
447 | 12 | std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); |
448 | 12 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
449 | 12 | } else { |
450 | 3 | index_file_writer = std::make_unique<IndexFileWriter>( |
451 | 3 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
452 | 3 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), |
453 | 3 | nullptr, true /* can_use_ram_dir */, _tablet->tablet_id()); |
454 | 3 | } |
455 | | // create inverted index writer, or ann index writer |
456 | 19 | for (auto inverted_index : _alter_inverted_indexes) { |
457 | 19 | DCHECK(inverted_index.index_type == TIndexType::INVERTED || |
458 | 19 | inverted_index.index_type == TIndexType::ANN); |
459 | 19 | DCHECK_EQ(inverted_index.columns.size(), 1); |
460 | 19 | auto index_id = inverted_index.index_id; |
461 | 19 | auto column_name = inverted_index.columns[0]; |
462 | 19 | auto column_idx = output_rowset_schema->field_index(column_name); |
463 | 19 | if (column_idx < 0) { |
464 | 4 | if (inverted_index.__isset.column_unique_ids && |
465 | 4 | !inverted_index.column_unique_ids.empty()) { |
466 | 1 | column_idx = output_rowset_schema->field_index( |
467 | 1 | inverted_index.column_unique_ids[0]); |
468 | 1 | } |
469 | 4 | if (column_idx < 0) { |
470 | 3 | LOG(WARNING) << "referenced column was missing. " |
471 | 3 | << "[column=" << column_name |
472 | 3 | << " referenced_column=" << column_idx << "]"; |
473 | 3 | continue; |
474 | 3 | } |
475 | 4 | } |
476 | 16 | auto column = output_rowset_schema->column(column_idx); |
477 | | // variant column is not support for building index |
478 | 16 | auto is_support_inverted_index = |
479 | 16 | IndexColumnWriter::check_support_inverted_index(column); |
480 | 16 | auto is_support_ann_index = IndexColumnWriter::check_support_ann_index(column); |
481 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index", |
482 | 16 | { is_support_inverted_index = false; }) |
483 | 16 | if (!is_support_inverted_index && !is_support_ann_index) { |
484 | 0 | continue; |
485 | 0 | } |
486 | 16 | DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id)); |
487 | 16 | _olap_data_convertor->add_column_data_convertor(column); |
488 | 16 | return_columns.emplace_back(column_idx); |
489 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
490 | | |
491 | 16 | if (inverted_index.index_type == TIndexType::INVERTED) { |
492 | | // inverted index |
493 | 15 | auto index_metas = output_rowset_schema->inverted_indexs(column); |
494 | 15 | for (const auto& index_meta : index_metas) { |
495 | 15 | if (index_meta->index_id() != index_id) { |
496 | 0 | continue; |
497 | 0 | } |
498 | 15 | std::unique_ptr<segment_v2::IndexColumnWriter> inverted_index_builder; |
499 | 15 | try { |
500 | 15 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
501 | 15 | field.get(), &inverted_index_builder, index_file_writer.get(), |
502 | 15 | index_meta)); |
503 | 15 | DBUG_EXECUTE_IF( |
504 | 15 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
505 | 15 | "error", |
506 | 15 | { |
507 | 15 | _CLTHROWA(CL_ERR_IO, |
508 | 15 | "debug point: " |
509 | 15 | "handle_single_rowset_index_column_writer_create_" |
510 | 15 | "error"); |
511 | 15 | }) |
512 | 15 | } catch (const std::exception& e) { |
513 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
514 | 0 | "CLuceneError occured: {}", e.what()); |
515 | 0 | } |
516 | | |
517 | 15 | if (inverted_index_builder) { |
518 | 15 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
519 | 15 | _index_column_writers.insert( |
520 | 15 | std::make_pair(writer_sign, std::move(inverted_index_builder))); |
521 | 15 | inverted_index_writer_signs.emplace_back(writer_sign); |
522 | 15 | } |
523 | 15 | } |
524 | 15 | } else if (inverted_index.index_type == TIndexType::ANN) { |
525 | | // ann index |
526 | 1 | const auto* index_meta = output_rowset_schema->ann_index(column); |
527 | 1 | if (index_meta && index_meta->index_id() == index_id) { |
528 | 1 | std::unique_ptr<segment_v2::IndexColumnWriter> index_writer; |
529 | 1 | try { |
530 | 1 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
531 | 1 | field.get(), &index_writer, index_file_writer.get(), |
532 | 1 | index_meta)); |
533 | 1 | DBUG_EXECUTE_IF( |
534 | 1 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
535 | 1 | "error", |
536 | 1 | { |
537 | 1 | _CLTHROWA(CL_ERR_IO, |
538 | 1 | "debug point: " |
539 | 1 | "handle_single_rowset_index_column_writer_create_" |
540 | 1 | "error"); |
541 | 1 | }) |
542 | 1 | } catch (const std::exception& e) { |
543 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
544 | 0 | "CLuceneError occured: {}", e.what()); |
545 | 0 | } |
546 | | |
547 | 1 | if (index_writer) { |
548 | 1 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
549 | 1 | _index_column_writers.insert( |
550 | 1 | std::make_pair(writer_sign, std::move(index_writer))); |
551 | 1 | inverted_index_writer_signs.emplace_back(writer_sign); |
552 | 1 | } |
553 | 1 | } |
554 | 1 | } |
555 | 16 | } |
556 | | |
557 | | // DO NOT forget index_file_writer for the segment, otherwise, original inverted index will be deleted. |
558 | 15 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
559 | 15 | if (return_columns.empty()) { |
560 | | // no columns to read |
561 | 3 | continue; |
562 | 3 | } |
563 | | // create iterator for each segment |
564 | 12 | StorageReadOptions read_options; |
565 | 12 | OlapReaderStatistics stats; |
566 | 12 | read_options.stats = &stats; |
567 | 12 | read_options.tablet_schema = output_rowset_schema; |
568 | 12 | std::shared_ptr<Schema> schema = |
569 | 12 | std::make_shared<Schema>(output_rowset_schema->columns(), return_columns); |
570 | 12 | std::unique_ptr<RowwiseIterator> iter; |
571 | 12 | auto res = seg_ptr->new_iterator(schema, read_options, &iter); |
572 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", { |
573 | 12 | res = Status::Error<ErrorCode::INTERNAL_ERROR>( |
574 | 12 | "debug point: handle_single_rowset_create_iterator_error"); |
575 | 12 | }) |
576 | 12 | if (!res.ok()) { |
577 | 0 | LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() |
578 | 0 | << "]: " << res.to_string(); |
579 | 0 | return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string()); |
580 | 0 | } |
581 | | |
582 | 12 | auto block = Block::create_unique(output_rowset_schema->create_block(return_columns)); |
583 | 24 | while (true) { |
584 | 24 | auto status = iter->next_batch(block.get()); |
585 | 24 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", { |
586 | 24 | status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
587 | 24 | "next_batch fault injection"); |
588 | 24 | }); |
589 | 24 | if (!status.ok()) { |
590 | 12 | if (status.is<ErrorCode::END_OF_FILE>()) { |
591 | 12 | break; |
592 | 12 | } |
593 | 12 | LOG(WARNING) |
594 | 0 | << "failed to read next block when schema change for inverted index." |
595 | 0 | << ", err=" << status.to_string(); |
596 | 0 | return status; |
597 | 12 | } |
598 | | |
599 | | // write inverted index data, or ann index data |
600 | 12 | status = _write_inverted_index_data(output_rowset_schema, iter->data_id(), |
601 | 12 | block.get()); |
602 | 12 | DBUG_EXECUTE_IF( |
603 | 12 | "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", { |
604 | 12 | status = Status::Error<ErrorCode::INTERNAL_ERROR>( |
605 | 12 | "debug point: " |
606 | 12 | "handle_single_rowset_write_inverted_index_data_error"); |
607 | 12 | }) |
608 | 12 | if (!status.ok()) { |
609 | 0 | return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
610 | 0 | "failed to write block."); |
611 | 0 | } |
612 | 12 | block->clear_column_data(); |
613 | 12 | } |
614 | | |
615 | | // finish write inverted index, flush data to compound file |
616 | 16 | for (auto& writer_sign : inverted_index_writer_signs) { |
617 | 16 | try { |
618 | 16 | if (_index_column_writers[writer_sign]) { |
619 | 16 | RETURN_IF_ERROR(_index_column_writers[writer_sign]->finish()); |
620 | 16 | } |
621 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", { |
622 | 16 | _CLTHROWA(CL_ERR_IO, |
623 | 16 | "debug point: handle_single_rowset_index_build_finish_error"); |
624 | 16 | }) |
625 | 16 | } catch (const std::exception& e) { |
626 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
627 | 0 | "CLuceneError occured: {}", e.what()); |
628 | 0 | } |
629 | 16 | } |
630 | | |
631 | 12 | _olap_data_convertor->reset(); |
632 | 12 | } |
633 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
634 | 15 | auto st = index_file_writer->begin_close(); |
635 | 15 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", { |
636 | 15 | st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
637 | 15 | "debug point: handle_single_rowset_file_writer_close_error"); |
638 | 15 | }) |
639 | 15 | if (!st.ok()) { |
640 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
641 | 0 | return st; |
642 | 0 | } |
643 | 15 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
644 | 15 | } |
645 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
646 | 15 | auto st = index_file_writer->finish_close(); |
647 | 15 | if (!st.ok()) { |
648 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
649 | 0 | return st; |
650 | 0 | } |
651 | 15 | } |
652 | 12 | _index_column_writers.clear(); |
653 | 12 | _index_file_writers.clear(); |
654 | 12 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
655 | 12 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
656 | 12 | inverted_index_size); |
657 | 12 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
658 | 12 | inverted_index_size); |
659 | 12 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
660 | 12 | } |
661 | | |
662 | 12 | return Status::OK(); |
663 | 16 | } |
664 | | |
665 | | Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, int64_t segment_idx, |
666 | 12 | Block* block) { |
667 | 12 | VLOG_DEBUG << "begin to write inverted/ann index"; |
668 | | // converter block data |
669 | 12 | _olap_data_convertor->set_source_content(block, 0, block->rows()); |
670 | 28 | for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) { |
671 | 16 | auto inverted_index = _alter_inverted_indexes[i]; |
672 | 16 | auto index_id = inverted_index.index_id; |
673 | 16 | auto column_name = inverted_index.columns[0]; |
674 | 16 | auto column_idx = tablet_schema->field_index(column_name); |
675 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative", |
676 | 16 | { column_idx = -1; }) |
677 | 16 | if (column_idx < 0) { |
678 | 1 | if (!inverted_index.column_unique_ids.empty()) { |
679 | 1 | auto column_unique_id = inverted_index.column_unique_ids[0]; |
680 | 1 | column_idx = tablet_schema->field_index(column_unique_id); |
681 | 1 | } |
682 | 1 | if (column_idx < 0) { |
683 | 0 | LOG(WARNING) << "referenced column was missing. " |
684 | 0 | << "[column=" << column_name << " referenced_column=" << column_idx |
685 | 0 | << "]"; |
686 | 0 | continue; |
687 | 0 | } |
688 | 1 | } |
689 | 16 | auto column = tablet_schema->column(column_idx); |
690 | 16 | auto writer_sign = std::make_pair(segment_idx, index_id); |
691 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
692 | 16 | auto converted_result = _olap_data_convertor->convert_column_data(i); |
693 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", { |
694 | 16 | converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>( |
695 | 16 | "debug point: _write_inverted_index_data_convert_column_data_error"); |
696 | 16 | }) |
697 | 16 | if (converted_result.first != Status::OK()) { |
698 | 0 | LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first; |
699 | 0 | return converted_result.first; |
700 | 0 | } |
701 | 16 | const auto* ptr = (const uint8_t*)converted_result.second->get_data(); |
702 | 16 | const auto* null_map = converted_result.second->get_nullmap(); |
703 | 16 | if (null_map) { |
704 | 0 | RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(), null_map, &ptr, |
705 | 0 | block->rows())); |
706 | 16 | } else { |
707 | 16 | RETURN_IF_ERROR(_add_data(column_name, writer_sign, field.get(), &ptr, block->rows())); |
708 | 16 | } |
709 | 16 | } |
710 | 12 | _olap_data_convertor->clear_source_content(); |
711 | | |
712 | 12 | return Status::OK(); |
713 | 12 | } |
714 | | |
715 | | Status IndexBuilder::_add_nullable(const std::string& column_name, |
716 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
717 | | StorageField* field, const uint8_t* null_map, |
718 | 0 | const uint8_t** ptr, size_t num_rows) { |
719 | | // TODO: need to process null data for inverted index |
720 | 0 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
721 | 0 | DCHECK(field->get_sub_field_count() == 1); |
722 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
723 | 0 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
724 | | // total number length |
725 | 0 | auto offset_data = *(data_ptr + 1); |
726 | 0 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
727 | 0 | try { |
728 | 0 | auto data = *(data_ptr + 2); |
729 | 0 | auto nested_null_map = *(data_ptr + 3); |
730 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
731 | 0 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
732 | 0 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
733 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", { |
734 | 0 | _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error"); |
735 | 0 | }) |
736 | 0 | RETURN_IF_ERROR( |
737 | 0 | _index_column_writers[index_writer_sign]->add_array_nulls(null_map, num_rows)); |
738 | 0 | } catch (const std::exception& e) { |
739 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
740 | 0 | "CLuceneError occured: {}", e.what()); |
741 | 0 | } |
742 | | |
743 | 0 | return Status::OK(); |
744 | 0 | } |
745 | 0 | size_t offset = 0; |
746 | 0 | auto next_run_step = [&]() { |
747 | 0 | size_t step = 1; |
748 | 0 | for (auto i = offset + 1; i < num_rows; ++i) { |
749 | 0 | if (null_map[offset] == null_map[i]) { |
750 | 0 | step++; |
751 | 0 | } else { |
752 | 0 | break; |
753 | 0 | } |
754 | 0 | } |
755 | 0 | return step; |
756 | 0 | }; |
757 | 0 | try { |
758 | 0 | do { |
759 | 0 | auto step = next_run_step(); |
760 | 0 | if (null_map[offset]) { |
761 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_nulls( |
762 | 0 | static_cast<uint32_t>(step))); |
763 | 0 | } else { |
764 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, |
765 | 0 | *ptr, step)); |
766 | 0 | } |
767 | 0 | *ptr += field->size() * step; |
768 | 0 | offset += step; |
769 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception", |
770 | 0 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); }) |
771 | 0 | } while (offset < num_rows); |
772 | 0 | } catch (const std::exception& e) { |
773 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
774 | 0 | e.what()); |
775 | 0 | } |
776 | | |
777 | 0 | return Status::OK(); |
778 | 0 | } |
779 | | |
780 | | Status IndexBuilder::_add_data(const std::string& column_name, |
781 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
782 | 16 | StorageField* field, const uint8_t** ptr, size_t num_rows) { |
783 | 16 | try { |
784 | 16 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
785 | 2 | DCHECK(field->get_sub_field_count() == 1); |
786 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
787 | 2 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
788 | | // total number length |
789 | 2 | auto element_cnt = size_t((unsigned long)(*data_ptr)); |
790 | 2 | auto offset_data = *(data_ptr + 1); |
791 | 2 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
792 | 2 | if (element_cnt > 0) { |
793 | 2 | auto data = *(data_ptr + 2); |
794 | 2 | auto nested_null_map = *(data_ptr + 3); |
795 | 2 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
796 | 2 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
797 | 2 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
798 | 2 | } |
799 | 14 | } else { |
800 | 14 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, *ptr, |
801 | 14 | num_rows)); |
802 | 14 | } |
803 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception", |
804 | 16 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); }) |
805 | 16 | } catch (const std::exception& e) { |
806 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
807 | 0 | e.what()); |
808 | 0 | } |
809 | | |
810 | 16 | return Status::OK(); |
811 | 16 | } |
812 | | |
813 | 17 | Status IndexBuilder::handle_inverted_index_data() { |
814 | 17 | LOG(INFO) << "begin to handle_inverted_index_data"; |
815 | 17 | DCHECK(_input_rowsets.size() == _output_rowsets.size()); |
816 | 17 | for (auto& _output_rowset : _output_rowsets) { |
817 | 17 | SegmentCacheHandle segment_cache_handle; |
818 | 17 | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
819 | 17 | std::static_pointer_cast<BetaRowset>(_output_rowset), &segment_cache_handle)); |
820 | 17 | auto output_rowset_meta = _output_rowset->rowset_meta(); |
821 | 17 | auto& segments = segment_cache_handle.get_segments(); |
822 | 17 | RETURN_IF_ERROR(handle_single_rowset(output_rowset_meta, segments)); |
823 | 17 | } |
824 | 16 | return Status::OK(); |
825 | 17 | } |
826 | | |
827 | 21 | Status IndexBuilder::do_build_inverted_index() { |
828 | 21 | LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id() |
829 | 21 | << ", is_drop_op=" << _is_drop_op; |
830 | 21 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty", |
831 | 21 | { _alter_inverted_indexes.clear(); }) |
832 | 21 | if (_alter_inverted_indexes.empty()) { |
833 | 0 | return Status::OK(); |
834 | 0 | } |
835 | | |
836 | 21 | static constexpr long TRY_LOCK_TIMEOUT = 30; |
837 | 21 | std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::defer_lock); |
838 | 21 | bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT)); |
839 | | |
840 | 21 | if (!owns_lock) { |
841 | 0 | return Status::ObtainLockFailed( |
842 | 0 | "try schema_change_lock failed. There might be schema change or cooldown running " |
843 | 0 | "on " |
844 | 0 | "tablet={} ", |
845 | 0 | _tablet->tablet_id()); |
846 | 0 | } |
847 | | // Check executing serially with compaction task. |
848 | 21 | std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(), |
849 | 21 | std::try_to_lock); |
850 | 21 | if (!base_compaction_lock.owns_lock()) { |
851 | 0 | return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ", |
852 | 0 | _tablet->tablet_id()); |
853 | 0 | } |
854 | 21 | std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), |
855 | 21 | std::try_to_lock); |
856 | 21 | if (!cumu_compaction_lock.owns_lock()) { |
857 | 0 | return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}", |
858 | 0 | _tablet->tablet_id()); |
859 | 0 | } |
860 | | |
861 | 21 | std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(), |
862 | 21 | std::try_to_lock); |
863 | 21 | if (!cold_compaction_lock.owns_lock()) { |
864 | 0 | return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}", |
865 | 0 | _tablet->tablet_id()); |
866 | 0 | } |
867 | | |
868 | 21 | std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(), |
869 | 21 | std::try_to_lock); |
870 | 21 | if (!build_inverted_index_lock.owns_lock()) { |
871 | 0 | return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}", |
872 | 0 | _tablet->tablet_id()); |
873 | 0 | } |
874 | | |
875 | 21 | std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock); |
876 | 21 | if (!migration_rlock.owns_lock()) { |
877 | 0 | return Status::ObtainLockFailed("got migration_rlock failed. tablet={}", |
878 | 0 | _tablet->tablet_id()); |
879 | 0 | } |
880 | | |
881 | 21 | DCHECK(!_alter_index_ids.empty()); |
882 | 21 | _input_rowsets = |
883 | 21 | _tablet->pick_candidate_rowsets_to_build_inverted_index(_alter_index_ids, _is_drop_op); |
884 | 21 | if (_input_rowsets.empty()) { |
885 | 1 | LOG(INFO) << "_input_rowsets is empty"; |
886 | 1 | return Status::OK(); |
887 | 1 | } |
888 | | |
889 | 20 | auto st = update_inverted_index_info(); |
890 | 20 | if (!st.ok()) { |
891 | 3 | LOG(WARNING) << "failed to update_inverted_index_info. " |
892 | 3 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
893 | 3 | gc_output_rowset(); |
894 | 3 | return st; |
895 | 3 | } |
896 | | |
897 | | // create inverted index file for output rowset |
898 | 17 | st = handle_inverted_index_data(); |
899 | 17 | if (!st.ok()) { |
900 | 1 | LOG(WARNING) << "failed to handle_inverted_index_data. " |
901 | 1 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
902 | 1 | gc_output_rowset(); |
903 | 1 | return st; |
904 | 1 | } |
905 | | |
906 | | // modify rowsets in memory |
907 | 16 | st = modify_rowsets(); |
908 | 16 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", { |
909 | 16 | st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>( |
910 | 16 | "debug point: do_build_inverted_index_modify_rowsets_status_error"); |
911 | 16 | }) |
912 | 16 | if (!st.ok()) { |
913 | 0 | LOG(WARNING) << "failed to modify rowsets in memory. " |
914 | 0 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
915 | 0 | gc_output_rowset(); |
916 | 0 | return st; |
917 | 0 | } |
918 | 16 | return Status::OK(); |
919 | 16 | } |
920 | | |
921 | 16 | Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { |
922 | 16 | DCHECK(std::ranges::all_of( |
923 | 16 | _output_rowsets.begin(), _output_rowsets.end(), [&engine = _engine](auto&& rs) { |
924 | 16 | if (engine.check_rowset_id_in_unused_rowsets(rs->rowset_id())) { |
925 | 16 | LOG(ERROR) << "output rowset: " << rs->rowset_id() << " in unused rowsets"; |
926 | 16 | return false; |
927 | 16 | } |
928 | 16 | return true; |
929 | 16 | })); |
930 | | |
931 | 16 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
932 | 16 | _tablet->enable_unique_key_merge_on_write()) { |
933 | 1 | std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock()); |
934 | 1 | std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); |
935 | 1 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
936 | 1 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id()); |
937 | 2 | for (auto i = 0; i < _input_rowsets.size(); ++i) { |
938 | 1 | RowsetId input_rowset_id = _input_rowsets[i]->rowset_id(); |
939 | 1 | RowsetId output_rowset_id = _output_rowsets[i]->rowset_id(); |
940 | 1 | for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) { |
941 | 0 | RowsetId rs_id = std::get<0>(k); |
942 | 0 | if (rs_id == input_rowset_id) { |
943 | 0 | DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k), |
944 | 0 | std::get<2>(k)}; |
945 | 0 | auto res = delete_bitmap->set(output_rs_key, v); |
946 | 0 | DCHECK(res > 0) << "delete_bitmap set failed, res=" << res; |
947 | 0 | } |
948 | 0 | } |
949 | 1 | } |
950 | 1 | _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); |
951 | | |
952 | | // modify_rowsets will remove the delete_bitmap for input rowsets, |
953 | | // should call it after merge delete_bitmap |
954 | 1 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
955 | 15 | } else { |
956 | 15 | std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); |
957 | 15 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
958 | 15 | } |
959 | | |
960 | | #ifndef BE_TEST |
961 | | { |
962 | | std::shared_lock rlock(_tablet->get_header_lock()); |
963 | | _tablet->save_meta(); |
964 | | } |
965 | | #endif |
966 | 16 | return Status::OK(); |
967 | 16 | } |
968 | | |
969 | 4 | void IndexBuilder::gc_output_rowset() { |
970 | 4 | for (auto&& output_rowset : _output_rowsets) { |
971 | 2 | auto is_local_rowset = output_rowset->is_local(); |
972 | 2 | DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset", |
973 | 2 | { is_local_rowset = false; }) |
974 | 2 | if (!is_local_rowset) { |
975 | 0 | _tablet->record_unused_remote_rowset(output_rowset->rowset_id(), |
976 | 0 | output_rowset->rowset_meta()->resource_id(), |
977 | 0 | output_rowset->num_segments()); |
978 | 0 | return; |
979 | 0 | } |
980 | 2 | _engine.add_unused_rowset(output_rowset); |
981 | 2 | } |
982 | 4 | } |
983 | | |
984 | | #include "common/compile_check_end.h" |
985 | | } // namespace doris |