be/src/storage/task/index_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/task/index_builder.h" |
19 | | |
20 | | #include <mutex> |
21 | | |
22 | | #include "common/logging.h" |
23 | | #include "common/status.h" |
24 | | #include "storage/field.h" |
25 | | #include "storage/index/index_file_reader.h" |
26 | | #include "storage/index/index_file_writer.h" |
27 | | #include "storage/index/inverted/inverted_index_desc.h" |
28 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
29 | | #include "storage/olap_define.h" |
30 | | #include "storage/rowset/beta_rowset.h" |
31 | | #include "storage/rowset/rowset_writer_context.h" |
32 | | #include "storage/segment/segment_loader.h" |
33 | | #include "storage/storage_engine.h" |
34 | | #include "storage/tablet/tablet_schema.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/trace.h" |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | |
41 | | IndexBuilder::IndexBuilder(StorageEngine& engine, TabletSharedPtr tablet, |
42 | | const std::vector<TColumn>& columns, |
43 | | const std::vector<doris::TOlapTableIndex>& alter_inverted_indexes, |
44 | | bool is_drop_op) |
45 | 22 | : _engine(engine), |
46 | 22 | _tablet(std::move(tablet)), |
47 | 22 | _columns(columns), |
48 | 22 | _alter_inverted_indexes(alter_inverted_indexes), |
49 | 22 | _is_drop_op(is_drop_op) { |
50 | 22 | _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>(); |
51 | 22 | } |
52 | | |
53 | 22 | IndexBuilder::~IndexBuilder() { |
54 | 22 | _olap_data_convertor.reset(); |
55 | 22 | _index_column_writers.clear(); |
56 | 22 | } |
57 | | |
58 | 22 | Status IndexBuilder::init() { |
59 | 24 | for (auto inverted_index : _alter_inverted_indexes) { |
60 | 24 | _alter_index_ids.insert(inverted_index.index_id); |
61 | 24 | } |
62 | 22 | return Status::OK(); |
63 | 22 | } |
64 | | |
65 | 18 | Status IndexBuilder::update_inverted_index_info() { |
66 | | // just do link files |
67 | 18 | LOG(INFO) << "begin to update_inverted_index_info, tablet=" << _tablet->tablet_id() |
68 | 18 | << ", is_drop_op=" << _is_drop_op; |
69 | | // index ids that will not be linked |
70 | 18 | std::set<int64_t> without_index_uids; |
71 | 18 | _output_rowsets.reserve(_input_rowsets.size()); |
72 | 18 | _pending_rs_guards.reserve(_input_rowsets.size()); |
73 | 18 | for (auto&& input_rowset : _input_rowsets) { |
74 | 18 | bool is_local_rowset = input_rowset->is_local(); |
75 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset", |
76 | 18 | { is_local_rowset = false; }) |
77 | 18 | if (!is_local_rowset) [[unlikely]] { |
78 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id(); |
79 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
80 | 0 | _tablet->tablet_id(), |
81 | 0 | input_rowset->rowset_id().to_string()); |
82 | 0 | } |
83 | | |
84 | 18 | TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>(); |
85 | 18 | const auto& input_rs_tablet_schema = input_rowset->tablet_schema(); |
86 | 18 | output_rs_tablet_schema->copy_from(*input_rs_tablet_schema); |
87 | 18 | int64_t total_index_size = 0; |
88 | 18 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); |
89 | 18 | auto size_st = beta_rowset->get_inverted_index_size(&total_index_size); |
90 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", { |
91 | 18 | size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed"); |
92 | 18 | }) |
93 | 18 | if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() && |
94 | 18 | !size_st.is<ErrorCode::NOT_FOUND>()) { |
95 | 0 | return size_st; |
96 | 0 | } |
97 | 18 | auto num_segments = input_rowset->num_segments(); |
98 | 18 | size_t drop_index_size = 0; |
99 | | |
100 | 18 | if (_is_drop_op) { |
101 | 4 | for (const auto& t_inverted_index : _alter_inverted_indexes) { |
102 | 4 | DCHECK_EQ(t_inverted_index.columns.size(), 1); |
103 | 4 | auto column_name = t_inverted_index.columns[0]; |
104 | 4 | auto column_idx = output_rs_tablet_schema->field_index(column_name); |
105 | 4 | if (column_idx < 0) { |
106 | 0 | if (!t_inverted_index.column_unique_ids.empty()) { |
107 | 0 | auto column_unique_id = t_inverted_index.column_unique_ids[0]; |
108 | 0 | column_idx = output_rs_tablet_schema->field_index(column_unique_id); |
109 | 0 | } |
110 | 0 | if (column_idx < 0) { |
111 | 0 | LOG(WARNING) << "referenced column was missing. " |
112 | 0 | << "[column=" << column_name |
113 | 0 | << " referenced_column=" << column_idx << "]"; |
114 | 0 | continue; |
115 | 0 | } |
116 | 0 | } |
117 | 4 | auto column = output_rs_tablet_schema->column(column_idx); |
118 | | |
119 | | // inverted index |
120 | 4 | auto index_metas = output_rs_tablet_schema->inverted_indexs(column); |
121 | 4 | for (const auto& index_meta : index_metas) { |
122 | | // Only drop the index that matches the requested index_id, |
123 | | // not all indexes on this column |
124 | 4 | if (index_meta->index_id() != t_inverted_index.index_id) { |
125 | 1 | continue; |
126 | 1 | } |
127 | 3 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
128 | 3 | InvertedIndexStorageFormatPB::V1) { |
129 | 1 | const auto& fs = io::global_local_filesystem(); |
130 | | |
131 | 2 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
132 | 1 | auto seg_path = local_segment_path( |
133 | 1 | _tablet->tablet_path(), input_rowset->rowset_id().to_string(), |
134 | 1 | seg_id); |
135 | 1 | auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( |
136 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
137 | 1 | index_meta->index_id(), index_meta->get_index_suffix()); |
138 | 1 | int64_t index_size = 0; |
139 | 1 | RETURN_IF_ERROR(fs->file_size(index_path, &index_size)); |
140 | 1 | VLOG_DEBUG << "inverted index file:" << index_path |
141 | 0 | << " size:" << index_size; |
142 | 1 | drop_index_size += index_size; |
143 | 1 | } |
144 | 1 | } |
145 | 3 | _dropped_inverted_indexes.push_back(*index_meta); |
146 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
147 | | // remove dropped index_meta from output rowset tablet schema |
148 | 3 | output_rs_tablet_schema->remove_index(index_meta->index_id()); |
149 | 3 | } |
150 | | |
151 | | // ann index |
152 | 4 | const auto* ann_index = output_rs_tablet_schema->ann_index(column); |
153 | 4 | if (!ann_index) { |
154 | 3 | continue; |
155 | 3 | } |
156 | | // Only drop the ann index that matches the requested index_id |
157 | 1 | if (ann_index->index_id() != t_inverted_index.index_id) { |
158 | 0 | continue; |
159 | 0 | } |
160 | 1 | DCHECK(output_rs_tablet_schema->get_inverted_index_storage_format() != |
161 | 1 | InvertedIndexStorageFormatPB::V1); |
162 | 1 | _dropped_inverted_indexes.push_back(*ann_index); |
163 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
164 | | // remove dropped index_meta from output rowset tablet schema |
165 | 1 | output_rs_tablet_schema->remove_index(ann_index->index_id()); |
166 | 1 | } |
167 | | |
168 | 4 | DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", { |
169 | 4 | auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
170 | 4 | "index_builder.update_inverted_index_info.drop_index", "indexes_count", 0); |
171 | 4 | if (indexes_count < 0) { |
172 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
173 | 4 | "indexes count cannot be negative"); |
174 | 4 | } |
175 | 4 | auto indexes_size = output_rs_tablet_schema->inverted_indexes().size(); |
176 | 4 | if (indexes_count != indexes_size) { |
177 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
178 | 4 | "indexes count not equal to expected"); |
179 | 4 | } |
180 | 4 | }) |
181 | 14 | } else { |
182 | | // base on input rowset's tablet_schema to build |
183 | | // output rowset's tablet_schema which only add |
184 | | // the indexes specified in this build index request |
185 | 16 | for (auto t_inverted_index : _alter_inverted_indexes) { |
186 | 16 | TabletIndex index; |
187 | 16 | index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema); |
188 | 16 | auto column_uid = index.col_unique_ids()[0]; |
189 | 16 | if (column_uid < 0) { |
190 | 3 | LOG(WARNING) << "referenced column was missing. " |
191 | 3 | << "[column=" << t_inverted_index.columns[0] |
192 | 3 | << " referenced_column=" << column_uid << "]"; |
193 | 3 | continue; |
194 | 3 | } |
195 | 13 | const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid); |
196 | | |
197 | | // inverted index |
198 | 13 | auto exist_indexs = output_rs_tablet_schema->inverted_indexs(col); |
199 | 13 | for (const auto& exist_index : exist_indexs) { |
200 | 0 | if (exist_index->index_id() != index.index_id()) { |
201 | 0 | if (exist_index->is_same_except_id(&index)) { |
202 | 0 | LOG(WARNING) << fmt::format( |
203 | 0 | "column: {} has a exist inverted index, but the index id not " |
204 | 0 | "equal " |
205 | 0 | "request's index id, , exist index id: {}, request's index id: " |
206 | 0 | "{}, " |
207 | 0 | "remove exist index in new output_rs_tablet_schema", |
208 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
209 | 0 | without_index_uids.insert(exist_index->index_id()); |
210 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
211 | 0 | } |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | // ann index |
216 | 13 | const auto* exist_index = output_rs_tablet_schema->ann_index(col); |
217 | 13 | if (exist_index && exist_index->index_id() != index.index_id()) { |
218 | 0 | if (exist_index->is_same_except_id(&index)) { |
219 | 0 | LOG(WARNING) << fmt::format( |
220 | 0 | "column: {} has a exist ann index, but the index id not " |
221 | 0 | "equal request's index id, , exist index id: {}, request's index " |
222 | 0 | "id: {}, remove exist index in new output_rs_tablet_schema", |
223 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
224 | 0 | without_index_uids.insert(exist_index->index_id()); |
225 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
226 | 0 | } |
227 | 0 | } |
228 | | |
229 | 13 | output_rs_tablet_schema->append_index(std::move(index)); |
230 | 13 | } |
231 | 14 | } |
232 | | // construct input rowset reader |
233 | 18 | RowsetReaderSharedPtr input_rs_reader; |
234 | 18 | RETURN_IF_ERROR(input_rowset->create_reader(&input_rs_reader)); |
235 | | // construct output rowset writer |
236 | 18 | RowsetWriterContext context; |
237 | 18 | context.version = input_rs_reader->version(); |
238 | 18 | context.rowset_state = VISIBLE; |
239 | 18 | context.segments_overlap = input_rowset->rowset_meta()->segments_overlap(); |
240 | 18 | context.tablet_schema = output_rs_tablet_schema; |
241 | 18 | context.newest_write_timestamp = input_rs_reader->newest_write_timestamp(); |
242 | 18 | auto output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); |
243 | 18 | _pending_rs_guards.push_back(_engine.add_pending_rowset(context)); |
244 | | |
245 | | // if without_index_uids is not empty, copy _alter_index_ids to it |
246 | | // else just use _alter_index_ids to avoid copy |
247 | 18 | if (!without_index_uids.empty()) { |
248 | 0 | without_index_uids.insert(_alter_index_ids.begin(), _alter_index_ids.end()); |
249 | 0 | } |
250 | | |
251 | | // build output rowset |
252 | 18 | RETURN_IF_ERROR(input_rowset->link_files_to( |
253 | 18 | _tablet->tablet_path(), output_rs_writer->rowset_id(), 0, |
254 | 18 | without_index_uids.empty() ? &_alter_index_ids : &without_index_uids)); |
255 | | |
256 | 18 | auto input_rowset_meta = input_rowset->rowset_meta(); |
257 | 18 | RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>(); |
258 | 18 | rowset_meta->set_num_rows(input_rowset_meta->num_rows()); |
259 | 18 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
260 | 18 | InvertedIndexStorageFormatPB::V1) { |
261 | 4 | if (_is_drop_op) { |
262 | 1 | VLOG_DEBUG << "data_disk_size:" << input_rowset_meta->data_disk_size() |
263 | 0 | << " total_disk_size:" << input_rowset_meta->total_disk_size() |
264 | 0 | << " index_disk_size:" << input_rowset_meta->index_disk_size() |
265 | 0 | << " drop_index_size:" << drop_index_size; |
266 | 1 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
267 | 1 | drop_index_size); |
268 | 1 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
269 | 1 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
270 | 1 | drop_index_size); |
271 | 3 | } else { |
272 | 3 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size()); |
273 | 3 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
274 | 3 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size()); |
275 | 3 | } |
276 | 14 | } else { |
277 | 31 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
278 | 17 | auto seg_path = DORIS_TRY(input_rowset->segment_path(seg_id)); |
279 | 17 | auto idx_file_reader = std::make_unique<IndexFileReader>( |
280 | 17 | context.fs(), |
281 | 17 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, |
282 | 17 | output_rs_tablet_schema->get_inverted_index_storage_format()); |
283 | 17 | auto st = idx_file_reader->init(); |
284 | 17 | DBUG_EXECUTE_IF( |
285 | 17 | "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", { |
286 | 17 | st = Status::Error<ErrorCode::INIT_FAILED>( |
287 | 17 | "debug point: reader init error"); |
288 | 17 | }) |
289 | 17 | if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) { |
290 | 0 | return st; |
291 | 0 | } |
292 | 17 | _index_file_readers.emplace( |
293 | 17 | std::make_pair(output_rs_writer->rowset_id().to_string(), seg_id), |
294 | 17 | std::move(idx_file_reader)); |
295 | 17 | } |
296 | 14 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
297 | 14 | total_index_size); |
298 | 14 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
299 | 14 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
300 | 14 | total_index_size); |
301 | 14 | } |
302 | 18 | rowset_meta->set_empty(input_rowset_meta->empty()); |
303 | 18 | rowset_meta->set_num_segments(input_rowset_meta->num_segments()); |
304 | 18 | rowset_meta->set_segments_overlap(input_rowset_meta->segments_overlap()); |
305 | 18 | rowset_meta->set_rowset_state(input_rowset_meta->rowset_state()); |
306 | 18 | std::vector<KeyBoundsPB> key_bounds; |
307 | 18 | RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds)); |
308 | 18 | rowset_meta->set_segments_key_bounds_truncated( |
309 | 18 | input_rowset_meta->is_segments_key_bounds_truncated()); |
310 | 18 | rowset_meta->set_segments_key_bounds(key_bounds); |
311 | 18 | std::vector<uint32_t> num_segment_rows; |
312 | 18 | input_rowset_meta->get_num_segment_rows(&num_segment_rows); |
313 | 18 | rowset_meta->set_num_segment_rows(num_segment_rows); |
314 | 18 | auto output_rowset = output_rs_writer->manual_build(rowset_meta); |
315 | 18 | if (input_rowset_meta->has_delete_predicate()) { |
316 | 0 | output_rowset->rowset_meta()->set_delete_predicate( |
317 | 0 | input_rowset_meta->delete_predicate()); |
318 | 0 | } |
319 | 18 | _output_rowsets.push_back(output_rowset); |
320 | 18 | } |
321 | | |
322 | 18 | return Status::OK(); |
323 | 18 | } |
324 | | |
325 | | Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta, |
326 | 16 | std::vector<segment_v2::SegmentSharedPtr>& segments) { |
327 | 16 | bool is_local_rowset = output_rowset_meta->is_local(); |
328 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset", |
329 | 16 | { is_local_rowset = false; }) |
330 | 16 | if (!is_local_rowset) [[unlikely]] { |
331 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id(); |
332 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
333 | 0 | _tablet->tablet_id(), |
334 | 0 | output_rowset_meta->rowset_id().to_string()); |
335 | 0 | } |
336 | | |
337 | 16 | if (_is_drop_op) { |
338 | 4 | const auto& output_rs_tablet_schema = output_rowset_meta->tablet_schema(); |
339 | 4 | if (output_rs_tablet_schema->get_inverted_index_storage_format() != |
340 | 4 | InvertedIndexStorageFormatPB::V1) { |
341 | 3 | const auto& fs = output_rowset_meta->fs(); |
342 | | |
343 | 3 | const auto& output_rowset_schema = output_rowset_meta->tablet_schema(); |
344 | 3 | size_t inverted_index_size = 0; |
345 | 3 | for (auto& seg_ptr : segments) { |
346 | 3 | auto idx_file_reader_iter = _index_file_readers.find( |
347 | 3 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
348 | 3 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op", |
349 | 3 | { idx_file_reader_iter = _index_file_readers.end(); }) |
350 | 3 | if (idx_file_reader_iter == _index_file_readers.end()) { |
351 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
352 | 0 | << seg_ptr->id() << " cannot be found"; |
353 | 0 | continue; |
354 | 0 | } |
355 | 3 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
356 | | |
357 | 3 | std::string index_path_prefix { |
358 | 3 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
359 | 3 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
360 | 3 | seg_ptr->id()))}; |
361 | | |
362 | 3 | std::string index_path = |
363 | 3 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
364 | 3 | io::FileWriterPtr file_writer; |
365 | 3 | Status st = fs->create_file(index_path, &file_writer); |
366 | 3 | if (!st.ok()) { |
367 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
368 | 0 | << ", err: " << st; |
369 | 0 | return st; |
370 | 0 | } |
371 | 3 | auto index_file_writer = std::make_unique<IndexFileWriter>( |
372 | 3 | fs, std::move(index_path_prefix), |
373 | 3 | output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), |
374 | 3 | output_rowset_schema->get_inverted_index_storage_format(), |
375 | 3 | std::move(file_writer)); |
376 | 3 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
377 | | // create inverted index writer |
378 | 3 | for (auto& index_meta : _dropped_inverted_indexes) { |
379 | 3 | RETURN_IF_ERROR(index_file_writer->delete_index(&index_meta)); |
380 | 3 | } |
381 | 3 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
382 | 3 | } |
383 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
384 | 3 | auto st = index_file_writer->begin_close(); |
385 | 3 | if (!st.ok()) { |
386 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
387 | 0 | return st; |
388 | 0 | } |
389 | 3 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
390 | 3 | } |
391 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
392 | 3 | auto st = index_file_writer->finish_close(); |
393 | 3 | if (!st.ok()) { |
394 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
395 | 0 | return st; |
396 | 0 | } |
397 | 3 | } |
398 | 3 | _index_file_writers.clear(); |
399 | 3 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
400 | 3 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
401 | 3 | inverted_index_size); |
402 | 3 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
403 | 3 | inverted_index_size); |
404 | 3 | } |
405 | 4 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
406 | 4 | return Status::OK(); |
407 | 12 | } else { |
408 | | // create inverted or ann index writer |
409 | 12 | const auto& fs = output_rowset_meta->fs(); |
410 | 12 | auto output_rowset_schema = output_rowset_meta->tablet_schema(); |
411 | 12 | size_t inverted_index_size = 0; |
412 | 15 | for (auto& seg_ptr : segments) { |
413 | 15 | std::string index_path_prefix { |
414 | 15 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
415 | 15 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
416 | 15 | seg_ptr->id()))}; |
417 | 15 | std::vector<ColumnId> return_columns; |
418 | 15 | std::vector<std::pair<int64_t, int64_t>> inverted_index_writer_signs; |
419 | 15 | _olap_data_convertor->reserve(_alter_inverted_indexes.size()); |
420 | | |
421 | 15 | std::unique_ptr<IndexFileWriter> index_file_writer = nullptr; |
422 | 15 | if (output_rowset_schema->get_inverted_index_storage_format() >= |
423 | 15 | InvertedIndexStorageFormatPB::V2) { |
424 | 12 | auto idx_file_reader_iter = _index_file_readers.find( |
425 | 12 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
426 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader", |
427 | 12 | { idx_file_reader_iter = _index_file_readers.end(); }) |
428 | 12 | if (idx_file_reader_iter == _index_file_readers.end()) { |
429 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
430 | 0 | << seg_ptr->id() << " cannot be found"; |
431 | 0 | continue; |
432 | 0 | } |
433 | 12 | std::string index_path = |
434 | 12 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
435 | 12 | io::FileWriterPtr file_writer; |
436 | 12 | Status st = fs->create_file(index_path, &file_writer); |
437 | 12 | if (!st.ok()) { |
438 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
439 | 0 | << ", err: " << st; |
440 | 0 | return st; |
441 | 0 | } |
442 | 12 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
443 | 12 | index_file_writer = std::make_unique<IndexFileWriter>( |
444 | 12 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
445 | 12 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), |
446 | 12 | std::move(file_writer)); |
447 | 12 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
448 | 12 | } else { |
449 | 3 | index_file_writer = std::make_unique<IndexFileWriter>( |
450 | 3 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
451 | 3 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format()); |
452 | 3 | } |
453 | | // create inverted index writer, or ann index writer |
454 | 19 | for (auto inverted_index : _alter_inverted_indexes) { |
455 | 19 | DCHECK(inverted_index.index_type == TIndexType::INVERTED || |
456 | 19 | inverted_index.index_type == TIndexType::ANN); |
457 | 19 | DCHECK_EQ(inverted_index.columns.size(), 1); |
458 | 19 | auto index_id = inverted_index.index_id; |
459 | 19 | auto column_name = inverted_index.columns[0]; |
460 | 19 | auto column_idx = output_rowset_schema->field_index(column_name); |
461 | 19 | if (column_idx < 0) { |
462 | 4 | if (inverted_index.__isset.column_unique_ids && |
463 | 4 | !inverted_index.column_unique_ids.empty()) { |
464 | 1 | column_idx = output_rowset_schema->field_index( |
465 | 1 | inverted_index.column_unique_ids[0]); |
466 | 1 | } |
467 | 4 | if (column_idx < 0) { |
468 | 3 | LOG(WARNING) << "referenced column was missing. " |
469 | 3 | << "[column=" << column_name |
470 | 3 | << " referenced_column=" << column_idx << "]"; |
471 | 3 | continue; |
472 | 3 | } |
473 | 4 | } |
474 | 16 | auto column = output_rowset_schema->column(column_idx); |
475 | | // variant column is not support for building index |
476 | 16 | auto is_support_inverted_index = |
477 | 16 | IndexColumnWriter::check_support_inverted_index(column); |
478 | 16 | auto is_support_ann_index = IndexColumnWriter::check_support_ann_index(column); |
479 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index", |
480 | 16 | { is_support_inverted_index = false; }) |
481 | 16 | if (!is_support_inverted_index && !is_support_ann_index) { |
482 | 0 | continue; |
483 | 0 | } |
484 | 16 | DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id)); |
485 | 16 | _olap_data_convertor->add_column_data_convertor(column); |
486 | 16 | return_columns.emplace_back(column_idx); |
487 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
488 | | |
489 | 16 | if (inverted_index.index_type == TIndexType::INVERTED) { |
490 | | // inverted index |
491 | 15 | auto index_metas = output_rowset_schema->inverted_indexs(column); |
492 | 15 | for (const auto& index_meta : index_metas) { |
493 | 15 | if (index_meta->index_id() != index_id) { |
494 | 0 | continue; |
495 | 0 | } |
496 | 15 | std::unique_ptr<segment_v2::IndexColumnWriter> inverted_index_builder; |
497 | 15 | try { |
498 | 15 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
499 | 15 | field.get(), &inverted_index_builder, index_file_writer.get(), |
500 | 15 | index_meta)); |
501 | 15 | DBUG_EXECUTE_IF( |
502 | 15 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
503 | 15 | "error", |
504 | 15 | { |
505 | 15 | _CLTHROWA(CL_ERR_IO, |
506 | 15 | "debug point: " |
507 | 15 | "handle_single_rowset_index_column_writer_create_" |
508 | 15 | "error"); |
509 | 15 | }) |
510 | 15 | } catch (const std::exception& e) { |
511 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
512 | 0 | "CLuceneError occured: {}", e.what()); |
513 | 0 | } |
514 | | |
515 | 15 | if (inverted_index_builder) { |
516 | 15 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
517 | 15 | _index_column_writers.insert( |
518 | 15 | std::make_pair(writer_sign, std::move(inverted_index_builder))); |
519 | 15 | inverted_index_writer_signs.emplace_back(writer_sign); |
520 | 15 | } |
521 | 15 | } |
522 | 15 | } else if (inverted_index.index_type == TIndexType::ANN) { |
523 | | // ann index |
524 | 1 | const auto* index_meta = output_rowset_schema->ann_index(column); |
525 | 1 | if (index_meta && index_meta->index_id() == index_id) { |
526 | 1 | std::unique_ptr<segment_v2::IndexColumnWriter> index_writer; |
527 | 1 | try { |
528 | 1 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
529 | 1 | field.get(), &index_writer, index_file_writer.get(), |
530 | 1 | index_meta)); |
531 | 1 | DBUG_EXECUTE_IF( |
532 | 1 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
533 | 1 | "error", |
534 | 1 | { |
535 | 1 | _CLTHROWA(CL_ERR_IO, |
536 | 1 | "debug point: " |
537 | 1 | "handle_single_rowset_index_column_writer_create_" |
538 | 1 | "error"); |
539 | 1 | }) |
540 | 1 | } catch (const std::exception& e) { |
541 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
542 | 0 | "CLuceneError occured: {}", e.what()); |
543 | 0 | } |
544 | | |
545 | 1 | if (index_writer) { |
546 | 1 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
547 | 1 | _index_column_writers.insert( |
548 | 1 | std::make_pair(writer_sign, std::move(index_writer))); |
549 | 1 | inverted_index_writer_signs.emplace_back(writer_sign); |
550 | 1 | } |
551 | 1 | } |
552 | 1 | } |
553 | 16 | } |
554 | | |
555 | | // DO NOT forget index_file_writer for the segment, otherwise, original inverted index will be deleted. |
556 | 15 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
557 | 15 | if (return_columns.empty()) { |
558 | | // no columns to read |
559 | 3 | continue; |
560 | 3 | } |
561 | | // create iterator for each segment |
562 | 12 | StorageReadOptions read_options; |
563 | 12 | OlapReaderStatistics stats; |
564 | 12 | read_options.stats = &stats; |
565 | 12 | read_options.tablet_schema = output_rowset_schema; |
566 | 12 | std::shared_ptr<Schema> schema = |
567 | 12 | std::make_shared<Schema>(output_rowset_schema->columns(), return_columns); |
568 | 12 | std::unique_ptr<RowwiseIterator> iter; |
569 | 12 | auto res = seg_ptr->new_iterator(schema, read_options, &iter); |
570 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", { |
571 | 12 | res = Status::Error<ErrorCode::INTERNAL_ERROR>( |
572 | 12 | "debug point: handle_single_rowset_create_iterator_error"); |
573 | 12 | }) |
574 | 12 | if (!res.ok()) { |
575 | 0 | LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() |
576 | 0 | << "]: " << res.to_string(); |
577 | 0 | return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string()); |
578 | 0 | } |
579 | | |
580 | 12 | auto block = Block::create_unique(output_rowset_schema->create_block(return_columns)); |
581 | 24 | while (true) { |
582 | 24 | auto status = iter->next_batch(block.get()); |
583 | 24 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", { |
584 | 24 | status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
585 | 24 | "next_batch fault injection"); |
586 | 24 | }); |
587 | 24 | if (!status.ok()) { |
588 | 12 | if (status.is<ErrorCode::END_OF_FILE>()) { |
589 | 12 | break; |
590 | 12 | } |
591 | 12 | LOG(WARNING) |
592 | 0 | << "failed to read next block when schema change for inverted index." |
593 | 0 | << ", err=" << status.to_string(); |
594 | 0 | return status; |
595 | 12 | } |
596 | | |
597 | | // write inverted index data, or ann index data |
598 | 12 | status = _write_inverted_index_data(output_rowset_schema, iter->data_id(), |
599 | 12 | block.get()); |
600 | 12 | DBUG_EXECUTE_IF( |
601 | 12 | "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", { |
602 | 12 | status = Status::Error<ErrorCode::INTERNAL_ERROR>( |
603 | 12 | "debug point: " |
604 | 12 | "handle_single_rowset_write_inverted_index_data_error"); |
605 | 12 | }) |
606 | 12 | if (!status.ok()) { |
607 | 0 | return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
608 | 0 | "failed to write block."); |
609 | 0 | } |
610 | 12 | block->clear_column_data(); |
611 | 12 | } |
612 | | |
613 | | // finish write inverted index, flush data to compound file |
614 | 16 | for (auto& writer_sign : inverted_index_writer_signs) { |
615 | 16 | try { |
616 | 16 | if (_index_column_writers[writer_sign]) { |
617 | 16 | RETURN_IF_ERROR(_index_column_writers[writer_sign]->finish()); |
618 | 16 | } |
619 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", { |
620 | 16 | _CLTHROWA(CL_ERR_IO, |
621 | 16 | "debug point: handle_single_rowset_index_build_finish_error"); |
622 | 16 | }) |
623 | 16 | } catch (const std::exception& e) { |
624 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
625 | 0 | "CLuceneError occured: {}", e.what()); |
626 | 0 | } |
627 | 16 | } |
628 | | |
629 | 12 | _olap_data_convertor->reset(); |
630 | 12 | } |
631 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
632 | 15 | auto st = index_file_writer->begin_close(); |
633 | 15 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", { |
634 | 15 | st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
635 | 15 | "debug point: handle_single_rowset_file_writer_close_error"); |
636 | 15 | }) |
637 | 15 | if (!st.ok()) { |
638 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
639 | 0 | return st; |
640 | 0 | } |
641 | 15 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
642 | 15 | } |
643 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
644 | 15 | auto st = index_file_writer->finish_close(); |
645 | 15 | if (!st.ok()) { |
646 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
647 | 0 | return st; |
648 | 0 | } |
649 | 15 | } |
650 | 12 | _index_column_writers.clear(); |
651 | 12 | _index_file_writers.clear(); |
652 | 12 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
653 | 12 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
654 | 12 | inverted_index_size); |
655 | 12 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
656 | 12 | inverted_index_size); |
657 | 12 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
658 | 12 | } |
659 | | |
660 | 12 | return Status::OK(); |
661 | 16 | } |
662 | | |
663 | | Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, int64_t segment_idx, |
664 | 12 | Block* block) { |
665 | 12 | VLOG_DEBUG << "begin to write inverted/ann index"; |
666 | | // converter block data |
667 | 12 | _olap_data_convertor->set_source_content(block, 0, block->rows()); |
668 | 28 | for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) { |
669 | 16 | auto inverted_index = _alter_inverted_indexes[i]; |
670 | 16 | auto index_id = inverted_index.index_id; |
671 | 16 | auto column_name = inverted_index.columns[0]; |
672 | 16 | auto column_idx = tablet_schema->field_index(column_name); |
673 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative", |
674 | 16 | { column_idx = -1; }) |
675 | 16 | if (column_idx < 0) { |
676 | 1 | if (!inverted_index.column_unique_ids.empty()) { |
677 | 1 | auto column_unique_id = inverted_index.column_unique_ids[0]; |
678 | 1 | column_idx = tablet_schema->field_index(column_unique_id); |
679 | 1 | } |
680 | 1 | if (column_idx < 0) { |
681 | 0 | LOG(WARNING) << "referenced column was missing. " |
682 | 0 | << "[column=" << column_name << " referenced_column=" << column_idx |
683 | 0 | << "]"; |
684 | 0 | continue; |
685 | 0 | } |
686 | 1 | } |
687 | 16 | auto column = tablet_schema->column(column_idx); |
688 | 16 | auto writer_sign = std::make_pair(segment_idx, index_id); |
689 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
690 | 16 | auto converted_result = _olap_data_convertor->convert_column_data(i); |
691 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", { |
692 | 16 | converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>( |
693 | 16 | "debug point: _write_inverted_index_data_convert_column_data_error"); |
694 | 16 | }) |
695 | 16 | if (converted_result.first != Status::OK()) { |
696 | 0 | LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first; |
697 | 0 | return converted_result.first; |
698 | 0 | } |
699 | 16 | const auto* ptr = (const uint8_t*)converted_result.second->get_data(); |
700 | 16 | const auto* null_map = converted_result.second->get_nullmap(); |
701 | 16 | if (null_map) { |
702 | 0 | RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(), null_map, &ptr, |
703 | 0 | block->rows())); |
704 | 16 | } else { |
705 | 16 | RETURN_IF_ERROR(_add_data(column_name, writer_sign, field.get(), &ptr, block->rows())); |
706 | 16 | } |
707 | 16 | } |
708 | 12 | _olap_data_convertor->clear_source_content(); |
709 | | |
710 | 12 | return Status::OK(); |
711 | 12 | } |
712 | | |
713 | | Status IndexBuilder::_add_nullable(const std::string& column_name, |
714 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
715 | | StorageField* field, const uint8_t* null_map, |
716 | 0 | const uint8_t** ptr, size_t num_rows) { |
717 | | // TODO: need to process null data for inverted index |
718 | 0 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
719 | 0 | DCHECK(field->get_sub_field_count() == 1); |
720 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
721 | 0 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
722 | | // total number length |
723 | 0 | auto offset_data = *(data_ptr + 1); |
724 | 0 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
725 | 0 | try { |
726 | 0 | auto data = *(data_ptr + 2); |
727 | 0 | auto nested_null_map = *(data_ptr + 3); |
728 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
729 | 0 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
730 | 0 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
731 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", { |
732 | 0 | _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error"); |
733 | 0 | }) |
734 | 0 | RETURN_IF_ERROR( |
735 | 0 | _index_column_writers[index_writer_sign]->add_array_nulls(null_map, num_rows)); |
736 | 0 | } catch (const std::exception& e) { |
737 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
738 | 0 | "CLuceneError occured: {}", e.what()); |
739 | 0 | } |
740 | | |
741 | 0 | return Status::OK(); |
742 | 0 | } |
743 | 0 | size_t offset = 0; |
744 | 0 | auto next_run_step = [&]() { |
745 | 0 | size_t step = 1; |
746 | 0 | for (auto i = offset + 1; i < num_rows; ++i) { |
747 | 0 | if (null_map[offset] == null_map[i]) { |
748 | 0 | step++; |
749 | 0 | } else { |
750 | 0 | break; |
751 | 0 | } |
752 | 0 | } |
753 | 0 | return step; |
754 | 0 | }; |
755 | 0 | try { |
756 | 0 | do { |
757 | 0 | auto step = next_run_step(); |
758 | 0 | if (null_map[offset]) { |
759 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_nulls( |
760 | 0 | static_cast<uint32_t>(step))); |
761 | 0 | } else { |
762 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, |
763 | 0 | *ptr, step)); |
764 | 0 | } |
765 | 0 | *ptr += field->size() * step; |
766 | 0 | offset += step; |
767 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception", |
768 | 0 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); }) |
769 | 0 | } while (offset < num_rows); |
770 | 0 | } catch (const std::exception& e) { |
771 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
772 | 0 | e.what()); |
773 | 0 | } |
774 | | |
775 | 0 | return Status::OK(); |
776 | 0 | } |
777 | | |
778 | | Status IndexBuilder::_add_data(const std::string& column_name, |
779 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
780 | 16 | StorageField* field, const uint8_t** ptr, size_t num_rows) { |
781 | 16 | try { |
782 | 16 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
783 | 2 | DCHECK(field->get_sub_field_count() == 1); |
784 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
785 | 2 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
786 | | // total number length |
787 | 2 | auto element_cnt = size_t((unsigned long)(*data_ptr)); |
788 | 2 | auto offset_data = *(data_ptr + 1); |
789 | 2 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
790 | 2 | if (element_cnt > 0) { |
791 | 2 | auto data = *(data_ptr + 2); |
792 | 2 | auto nested_null_map = *(data_ptr + 3); |
793 | 2 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
794 | 2 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
795 | 2 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
796 | 2 | } |
797 | 14 | } else { |
798 | 14 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, *ptr, |
799 | 14 | num_rows)); |
800 | 14 | } |
801 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception", |
802 | 16 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); }) |
803 | 16 | } catch (const std::exception& e) { |
804 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
805 | 0 | e.what()); |
806 | 0 | } |
807 | | |
808 | 16 | return Status::OK(); |
809 | 16 | } |
810 | | |
811 | 17 | Status IndexBuilder::handle_inverted_index_data() { |
812 | 17 | LOG(INFO) << "begin to handle_inverted_index_data"; |
813 | 17 | DCHECK(_input_rowsets.size() == _output_rowsets.size()); |
814 | 17 | for (auto& _output_rowset : _output_rowsets) { |
815 | 17 | SegmentCacheHandle segment_cache_handle; |
816 | 17 | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
817 | 17 | std::static_pointer_cast<BetaRowset>(_output_rowset), &segment_cache_handle)); |
818 | 17 | auto output_rowset_meta = _output_rowset->rowset_meta(); |
819 | 17 | auto& segments = segment_cache_handle.get_segments(); |
820 | 17 | RETURN_IF_ERROR(handle_single_rowset(output_rowset_meta, segments)); |
821 | 17 | } |
822 | 16 | return Status::OK(); |
823 | 17 | } |
824 | | |
825 | 21 | Status IndexBuilder::do_build_inverted_index() { |
826 | 21 | LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id() |
827 | 21 | << ", is_drop_op=" << _is_drop_op; |
828 | 21 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty", |
829 | 21 | { _alter_inverted_indexes.clear(); }) |
830 | 21 | if (_alter_inverted_indexes.empty()) { |
831 | 0 | return Status::OK(); |
832 | 0 | } |
833 | | |
834 | 21 | static constexpr long TRY_LOCK_TIMEOUT = 30; |
835 | 21 | std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::defer_lock); |
836 | 21 | bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT)); |
837 | | |
838 | 21 | if (!owns_lock) { |
839 | 0 | return Status::ObtainLockFailed( |
840 | 0 | "try schema_change_lock failed. There might be schema change or cooldown running " |
841 | 0 | "on " |
842 | 0 | "tablet={} ", |
843 | 0 | _tablet->tablet_id()); |
844 | 0 | } |
845 | | // Check executing serially with compaction task. |
846 | 21 | std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(), |
847 | 21 | std::try_to_lock); |
848 | 21 | if (!base_compaction_lock.owns_lock()) { |
849 | 0 | return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ", |
850 | 0 | _tablet->tablet_id()); |
851 | 0 | } |
852 | 21 | std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), |
853 | 21 | std::try_to_lock); |
854 | 21 | if (!cumu_compaction_lock.owns_lock()) { |
855 | 0 | return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}", |
856 | 0 | _tablet->tablet_id()); |
857 | 0 | } |
858 | | |
859 | 21 | std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(), |
860 | 21 | std::try_to_lock); |
861 | 21 | if (!cold_compaction_lock.owns_lock()) { |
862 | 0 | return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}", |
863 | 0 | _tablet->tablet_id()); |
864 | 0 | } |
865 | | |
866 | 21 | std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(), |
867 | 21 | std::try_to_lock); |
868 | 21 | if (!build_inverted_index_lock.owns_lock()) { |
869 | 0 | return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}", |
870 | 0 | _tablet->tablet_id()); |
871 | 0 | } |
872 | | |
873 | 21 | std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock); |
874 | 21 | if (!migration_rlock.owns_lock()) { |
875 | 0 | return Status::ObtainLockFailed("got migration_rlock failed. tablet={}", |
876 | 0 | _tablet->tablet_id()); |
877 | 0 | } |
878 | | |
879 | 21 | DCHECK(!_alter_index_ids.empty()); |
880 | 21 | _input_rowsets = |
881 | 21 | _tablet->pick_candidate_rowsets_to_build_inverted_index(_alter_index_ids, _is_drop_op); |
882 | 21 | if (_input_rowsets.empty()) { |
883 | 1 | LOG(INFO) << "_input_rowsets is empty"; |
884 | 1 | return Status::OK(); |
885 | 1 | } |
886 | | |
887 | 20 | auto st = update_inverted_index_info(); |
888 | 20 | if (!st.ok()) { |
889 | 3 | LOG(WARNING) << "failed to update_inverted_index_info. " |
890 | 3 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
891 | 3 | gc_output_rowset(); |
892 | 3 | return st; |
893 | 3 | } |
894 | | |
895 | | // create inverted index file for output rowset |
896 | 17 | st = handle_inverted_index_data(); |
897 | 17 | if (!st.ok()) { |
898 | 1 | LOG(WARNING) << "failed to handle_inverted_index_data. " |
899 | 1 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
900 | 1 | gc_output_rowset(); |
901 | 1 | return st; |
902 | 1 | } |
903 | | |
904 | | // modify rowsets in memory |
905 | 16 | st = modify_rowsets(); |
906 | 16 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", { |
907 | 16 | st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>( |
908 | 16 | "debug point: do_build_inverted_index_modify_rowsets_status_error"); |
909 | 16 | }) |
910 | 16 | if (!st.ok()) { |
911 | 0 | LOG(WARNING) << "failed to modify rowsets in memory. " |
912 | 0 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
913 | 0 | gc_output_rowset(); |
914 | 0 | return st; |
915 | 0 | } |
916 | 16 | return Status::OK(); |
917 | 16 | } |
918 | | |
919 | 16 | Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { |
920 | 16 | DCHECK(std::ranges::all_of( |
921 | 16 | _output_rowsets.begin(), _output_rowsets.end(), [&engine = _engine](auto&& rs) { |
922 | 16 | if (engine.check_rowset_id_in_unused_rowsets(rs->rowset_id())) { |
923 | 16 | LOG(ERROR) << "output rowset: " << rs->rowset_id() << " in unused rowsets"; |
924 | 16 | return false; |
925 | 16 | } |
926 | 16 | return true; |
927 | 16 | })); |
928 | | |
929 | 16 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
930 | 16 | _tablet->enable_unique_key_merge_on_write()) { |
931 | 1 | std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock()); |
932 | 1 | std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); |
933 | 1 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
934 | 1 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id()); |
935 | 2 | for (auto i = 0; i < _input_rowsets.size(); ++i) { |
936 | 1 | RowsetId input_rowset_id = _input_rowsets[i]->rowset_id(); |
937 | 1 | RowsetId output_rowset_id = _output_rowsets[i]->rowset_id(); |
938 | 1 | for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) { |
939 | 0 | RowsetId rs_id = std::get<0>(k); |
940 | 0 | if (rs_id == input_rowset_id) { |
941 | 0 | DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k), |
942 | 0 | std::get<2>(k)}; |
943 | 0 | auto res = delete_bitmap->set(output_rs_key, v); |
944 | 0 | DCHECK(res > 0) << "delete_bitmap set failed, res=" << res; |
945 | 0 | } |
946 | 0 | } |
947 | 1 | } |
948 | 1 | _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); |
949 | | |
950 | | // modify_rowsets will remove the delete_bitmap for input rowsets, |
951 | | // should call it after merge delete_bitmap |
952 | 1 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
953 | 15 | } else { |
954 | 15 | std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); |
955 | 15 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
956 | 15 | } |
957 | | |
958 | 16 | #ifndef BE_TEST |
959 | 16 | { |
960 | 16 | std::shared_lock rlock(_tablet->get_header_lock()); |
961 | 16 | _tablet->save_meta(); |
962 | 16 | } |
963 | 16 | #endif |
964 | 16 | return Status::OK(); |
965 | 16 | } |
966 | | |
967 | 4 | void IndexBuilder::gc_output_rowset() { |
968 | 4 | for (auto&& output_rowset : _output_rowsets) { |
969 | 2 | auto is_local_rowset = output_rowset->is_local(); |
970 | 2 | DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset", |
971 | 2 | { is_local_rowset = false; }) |
972 | 2 | if (!is_local_rowset) { |
973 | 0 | _tablet->record_unused_remote_rowset(output_rowset->rowset_id(), |
974 | 0 | output_rowset->rowset_meta()->resource_id(), |
975 | 0 | output_rowset->num_segments()); |
976 | 0 | return; |
977 | 0 | } |
978 | 2 | _engine.add_unused_rowset(output_rowset); |
979 | 2 | } |
980 | 4 | } |
981 | | |
982 | | #include "common/compile_check_end.h" |
983 | | } // namespace doris |