be/src/storage/task/index_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/task/index_builder.h" |
19 | | |
20 | | #include <mutex> |
21 | | |
22 | | #include "common/logging.h" |
23 | | #include "common/status.h" |
24 | | #include "storage/field.h" |
25 | | #include "storage/index/index_file_reader.h" |
26 | | #include "storage/index/index_file_writer.h" |
27 | | #include "storage/index/inverted/inverted_index_desc.h" |
28 | | #include "storage/index/inverted/inverted_index_fs_directory.h" |
29 | | #include "storage/olap_define.h" |
30 | | #include "storage/rowset/beta_rowset.h" |
31 | | #include "storage/rowset/rowset_writer_context.h" |
32 | | #include "storage/segment/segment_loader.h" |
33 | | #include "storage/storage_engine.h" |
34 | | #include "storage/tablet/tablet_schema.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/trace.h" |
37 | | |
38 | | namespace doris { |
39 | | |
40 | | IndexBuilder::IndexBuilder(StorageEngine& engine, TabletSharedPtr tablet, |
41 | | const std::vector<TColumn>& columns, |
42 | | const std::vector<doris::TOlapTableIndex>& alter_inverted_indexes, |
43 | | bool is_drop_op) |
44 | 22 | : _engine(engine), |
45 | 22 | _tablet(std::move(tablet)), |
46 | 22 | _columns(columns), |
47 | 22 | _alter_inverted_indexes(alter_inverted_indexes), |
48 | 22 | _is_drop_op(is_drop_op) { |
49 | 22 | _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>(); |
50 | 22 | } |
51 | | |
52 | 22 | IndexBuilder::~IndexBuilder() { |
53 | 22 | _olap_data_convertor.reset(); |
54 | 22 | _index_column_writers.clear(); |
55 | 22 | } |
56 | | |
57 | 22 | Status IndexBuilder::init() { |
58 | 24 | for (auto inverted_index : _alter_inverted_indexes) { |
59 | 24 | _alter_index_ids.insert(inverted_index.index_id); |
60 | 24 | } |
61 | 22 | return Status::OK(); |
62 | 22 | } |
63 | | |
64 | 18 | Status IndexBuilder::update_inverted_index_info() { |
65 | | // just do link files |
66 | 18 | LOG(INFO) << "begin to update_inverted_index_info, tablet=" << _tablet->tablet_id() |
67 | 18 | << ", is_drop_op=" << _is_drop_op; |
68 | | // index ids that will not be linked |
69 | 18 | std::set<int64_t> without_index_uids; |
70 | 18 | _output_rowsets.reserve(_input_rowsets.size()); |
71 | 18 | _pending_rs_guards.reserve(_input_rowsets.size()); |
72 | 18 | for (auto&& input_rowset : _input_rowsets) { |
73 | 18 | bool is_local_rowset = input_rowset->is_local(); |
74 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset", |
75 | 18 | { is_local_rowset = false; }) |
76 | 18 | if (!is_local_rowset) [[unlikely]] { |
77 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id(); |
78 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
79 | 0 | _tablet->tablet_id(), |
80 | 0 | input_rowset->rowset_id().to_string()); |
81 | 0 | } |
82 | | |
83 | 18 | TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>(); |
84 | 18 | const auto& input_rs_tablet_schema = input_rowset->tablet_schema(); |
85 | 18 | output_rs_tablet_schema->copy_from(*input_rs_tablet_schema); |
86 | 18 | int64_t total_index_size = 0; |
87 | 18 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); |
88 | 18 | auto size_st = beta_rowset->get_inverted_index_size(&total_index_size); |
89 | 18 | DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", { |
90 | 18 | size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed"); |
91 | 18 | }) |
92 | 18 | if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() && |
93 | 18 | !size_st.is<ErrorCode::NOT_FOUND>()) { |
94 | 0 | return size_st; |
95 | 0 | } |
96 | 18 | auto num_segments = input_rowset->num_segments(); |
97 | 18 | size_t drop_index_size = 0; |
98 | | |
99 | 18 | if (_is_drop_op) { |
100 | 4 | for (const auto& t_inverted_index : _alter_inverted_indexes) { |
101 | 4 | DCHECK_EQ(t_inverted_index.columns.size(), 1); |
102 | 4 | auto column_name = t_inverted_index.columns[0]; |
103 | 4 | auto column_idx = output_rs_tablet_schema->field_index(column_name); |
104 | 4 | if (column_idx < 0) { |
105 | 0 | if (!t_inverted_index.column_unique_ids.empty()) { |
106 | 0 | auto column_unique_id = t_inverted_index.column_unique_ids[0]; |
107 | 0 | column_idx = output_rs_tablet_schema->field_index(column_unique_id); |
108 | 0 | } |
109 | 0 | if (column_idx < 0) { |
110 | 0 | LOG(WARNING) << "referenced column was missing. " |
111 | 0 | << "[column=" << column_name |
112 | 0 | << " referenced_column=" << column_idx << "]"; |
113 | 0 | continue; |
114 | 0 | } |
115 | 0 | } |
116 | 4 | auto column = output_rs_tablet_schema->column(column_idx); |
117 | | |
118 | | // inverted index |
119 | 4 | auto index_metas = output_rs_tablet_schema->inverted_indexs(column); |
120 | 4 | for (const auto& index_meta : index_metas) { |
121 | | // Only drop the index that matches the requested index_id, |
122 | | // not all indexes on this column |
123 | 4 | if (index_meta->index_id() != t_inverted_index.index_id) { |
124 | 1 | continue; |
125 | 1 | } |
126 | 3 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
127 | 3 | InvertedIndexStorageFormatPB::V1) { |
128 | 1 | const auto& fs = io::global_local_filesystem(); |
129 | | |
130 | 2 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
131 | 1 | auto seg_path = local_segment_path( |
132 | 1 | _tablet->tablet_path(), input_rowset->rowset_id().to_string(), |
133 | 1 | seg_id); |
134 | 1 | auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( |
135 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
136 | 1 | index_meta->index_id(), index_meta->get_index_suffix()); |
137 | 1 | int64_t index_size = 0; |
138 | 1 | RETURN_IF_ERROR(fs->file_size(index_path, &index_size)); |
139 | 1 | VLOG_DEBUG << "inverted index file:" << index_path |
140 | 0 | << " size:" << index_size; |
141 | 1 | drop_index_size += index_size; |
142 | 1 | } |
143 | 1 | } |
144 | 3 | _dropped_inverted_indexes.push_back(*index_meta); |
145 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
146 | | // remove dropped index_meta from output rowset tablet schema |
147 | 3 | output_rs_tablet_schema->remove_index(index_meta->index_id()); |
148 | 3 | } |
149 | | |
150 | | // ann index |
151 | 4 | const auto* ann_index = output_rs_tablet_schema->ann_index(column); |
152 | 4 | if (!ann_index) { |
153 | 3 | continue; |
154 | 3 | } |
155 | | // Only drop the ann index that matches the requested index_id |
156 | 1 | if (ann_index->index_id() != t_inverted_index.index_id) { |
157 | 0 | continue; |
158 | 0 | } |
159 | 1 | DCHECK(output_rs_tablet_schema->get_inverted_index_storage_format() != |
160 | 1 | InvertedIndexStorageFormatPB::V1); |
161 | 1 | _dropped_inverted_indexes.push_back(*ann_index); |
162 | | // ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED. |
163 | | // remove dropped index_meta from output rowset tablet schema |
164 | 1 | output_rs_tablet_schema->remove_index(ann_index->index_id()); |
165 | 1 | } |
166 | | |
167 | 4 | DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", { |
168 | 4 | auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
169 | 4 | "index_builder.update_inverted_index_info.drop_index", "indexes_count", 0); |
170 | 4 | if (indexes_count < 0) { |
171 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
172 | 4 | "indexes count cannot be negative"); |
173 | 4 | } |
174 | 4 | auto indexes_size = output_rs_tablet_schema->inverted_indexes().size(); |
175 | 4 | if (indexes_count != indexes_size) { |
176 | 4 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
177 | 4 | "indexes count not equal to expected"); |
178 | 4 | } |
179 | 4 | }) |
180 | 14 | } else { |
181 | | // base on input rowset's tablet_schema to build |
182 | | // output rowset's tablet_schema which only add |
183 | | // the indexes specified in this build index request |
184 | 16 | for (auto t_inverted_index : _alter_inverted_indexes) { |
185 | 16 | TabletIndex index; |
186 | 16 | index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema); |
187 | 16 | auto column_uid = index.col_unique_ids()[0]; |
188 | 16 | if (column_uid < 0) { |
189 | 3 | LOG(WARNING) << "referenced column was missing. " |
190 | 3 | << "[column=" << t_inverted_index.columns[0] |
191 | 3 | << " referenced_column=" << column_uid << "]"; |
192 | 3 | continue; |
193 | 3 | } |
194 | 13 | const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid); |
195 | | |
196 | | // inverted index |
197 | 13 | auto exist_indexs = output_rs_tablet_schema->inverted_indexs(col); |
198 | 13 | for (const auto& exist_index : exist_indexs) { |
199 | 0 | if (exist_index->index_id() != index.index_id()) { |
200 | 0 | if (exist_index->is_same_except_id(&index)) { |
201 | 0 | LOG(WARNING) << fmt::format( |
202 | 0 | "column: {} has a exist inverted index, but the index id not " |
203 | 0 | "equal " |
204 | 0 | "request's index id, , exist index id: {}, request's index id: " |
205 | 0 | "{}, " |
206 | 0 | "remove exist index in new output_rs_tablet_schema", |
207 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
208 | 0 | without_index_uids.insert(exist_index->index_id()); |
209 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
210 | 0 | } |
211 | 0 | } |
212 | 0 | } |
213 | | |
214 | | // ann index |
215 | 13 | const auto* exist_index = output_rs_tablet_schema->ann_index(col); |
216 | 13 | if (exist_index && exist_index->index_id() != index.index_id()) { |
217 | 0 | if (exist_index->is_same_except_id(&index)) { |
218 | 0 | LOG(WARNING) << fmt::format( |
219 | 0 | "column: {} has a exist ann index, but the index id not " |
220 | 0 | "equal request's index id, , exist index id: {}, request's index " |
221 | 0 | "id: {}, remove exist index in new output_rs_tablet_schema", |
222 | 0 | column_uid, exist_index->index_id(), index.index_id()); |
223 | 0 | without_index_uids.insert(exist_index->index_id()); |
224 | 0 | output_rs_tablet_schema->remove_index(exist_index->index_id()); |
225 | 0 | } |
226 | 0 | } |
227 | | |
228 | 13 | output_rs_tablet_schema->append_index(std::move(index)); |
229 | 13 | } |
230 | 14 | } |
231 | | // construct input rowset reader |
232 | 18 | RowsetReaderSharedPtr input_rs_reader; |
233 | 18 | RETURN_IF_ERROR(input_rowset->create_reader(&input_rs_reader)); |
234 | | // construct output rowset writer |
235 | 18 | RowsetWriterContext context; |
236 | 18 | context.version = input_rs_reader->version(); |
237 | 18 | context.rowset_state = VISIBLE; |
238 | 18 | context.segments_overlap = input_rowset->rowset_meta()->segments_overlap(); |
239 | 18 | context.tablet_schema = output_rs_tablet_schema; |
240 | 18 | context.newest_write_timestamp = input_rs_reader->newest_write_timestamp(); |
241 | 18 | auto output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); |
242 | 18 | _pending_rs_guards.push_back(_engine.add_pending_rowset(context)); |
243 | | |
244 | | // if without_index_uids is not empty, copy _alter_index_ids to it |
245 | | // else just use _alter_index_ids to avoid copy |
246 | 18 | if (!without_index_uids.empty()) { |
247 | 0 | without_index_uids.insert(_alter_index_ids.begin(), _alter_index_ids.end()); |
248 | 0 | } |
249 | | |
250 | | // build output rowset |
251 | 18 | RETURN_IF_ERROR(input_rowset->link_files_to( |
252 | 18 | _tablet->tablet_path(), output_rs_writer->rowset_id(), 0, |
253 | 18 | without_index_uids.empty() ? &_alter_index_ids : &without_index_uids)); |
254 | | |
255 | 18 | auto input_rowset_meta = input_rowset->rowset_meta(); |
256 | 18 | RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>(); |
257 | 18 | rowset_meta->set_num_rows(input_rowset_meta->num_rows()); |
258 | 18 | if (output_rs_tablet_schema->get_inverted_index_storage_format() == |
259 | 18 | InvertedIndexStorageFormatPB::V1) { |
260 | 4 | if (_is_drop_op) { |
261 | 1 | VLOG_DEBUG << "data_disk_size:" << input_rowset_meta->data_disk_size() |
262 | 0 | << " total_disk_size:" << input_rowset_meta->total_disk_size() |
263 | 0 | << " index_disk_size:" << input_rowset_meta->index_disk_size() |
264 | 0 | << " drop_index_size:" << drop_index_size; |
265 | 1 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
266 | 1 | drop_index_size); |
267 | 1 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
268 | 1 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
269 | 1 | drop_index_size); |
270 | 3 | } else { |
271 | 3 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size()); |
272 | 3 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
273 | 3 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size()); |
274 | 3 | } |
275 | 14 | } else { |
276 | 31 | for (int seg_id = 0; seg_id < num_segments; seg_id++) { |
277 | 17 | auto seg_path = DORIS_TRY(input_rowset->segment_path(seg_id)); |
278 | 17 | auto idx_file_reader = std::make_unique<IndexFileReader>( |
279 | 17 | context.fs(), |
280 | 17 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, |
281 | 17 | output_rs_tablet_schema->get_inverted_index_storage_format(), |
282 | 17 | InvertedIndexFileInfo(), _tablet->tablet_id()); |
283 | 17 | auto st = idx_file_reader->init(); |
284 | 17 | DBUG_EXECUTE_IF( |
285 | 17 | "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", { |
286 | 17 | st = Status::Error<ErrorCode::INIT_FAILED>( |
287 | 17 | "debug point: reader init error"); |
288 | 17 | }) |
289 | 17 | if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) { |
290 | 0 | return st; |
291 | 0 | } |
292 | 17 | _index_file_readers.emplace( |
293 | 17 | std::make_pair(output_rs_writer->rowset_id().to_string(), seg_id), |
294 | 17 | std::move(idx_file_reader)); |
295 | 17 | } |
296 | 14 | rowset_meta->set_total_disk_size(input_rowset_meta->total_disk_size() - |
297 | 14 | total_index_size); |
298 | 14 | rowset_meta->set_data_disk_size(input_rowset_meta->data_disk_size()); |
299 | 14 | rowset_meta->set_index_disk_size(input_rowset_meta->index_disk_size() - |
300 | 14 | total_index_size); |
301 | 14 | } |
302 | 18 | rowset_meta->set_empty(input_rowset_meta->empty()); |
303 | 18 | rowset_meta->set_num_segments(input_rowset_meta->num_segments()); |
304 | 18 | rowset_meta->set_segments_overlap(input_rowset_meta->segments_overlap()); |
305 | 18 | rowset_meta->set_rowset_state(input_rowset_meta->rowset_state()); |
306 | 18 | std::vector<KeyBoundsPB> key_bounds; |
307 | 18 | RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds)); |
308 | 18 | rowset_meta->set_segments_key_bounds_truncated( |
309 | 18 | input_rowset_meta->is_segments_key_bounds_truncated()); |
310 | 18 | rowset_meta->set_segments_key_bounds_aggregated( |
311 | 18 | input_rowset_meta->is_segments_key_bounds_aggregated()); |
312 | | // key_bounds is already aggregated (size 1) or per-segment; copy verbatim. |
313 | 18 | rowset_meta->set_segments_key_bounds(key_bounds); |
314 | 18 | std::vector<uint32_t> num_segment_rows; |
315 | 18 | input_rowset_meta->get_num_segment_rows(&num_segment_rows); |
316 | 18 | rowset_meta->set_num_segment_rows(num_segment_rows); |
317 | 18 | auto output_rowset = output_rs_writer->manual_build(rowset_meta); |
318 | 18 | if (input_rowset_meta->has_delete_predicate()) { |
319 | 0 | output_rowset->rowset_meta()->set_delete_predicate( |
320 | 0 | input_rowset_meta->delete_predicate()); |
321 | 0 | } |
322 | 18 | _output_rowsets.push_back(output_rowset); |
323 | 18 | } |
324 | | |
325 | 18 | return Status::OK(); |
326 | 18 | } |
327 | | |
328 | | Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta, |
329 | 16 | std::vector<segment_v2::SegmentSharedPtr>& segments) { |
330 | 16 | bool is_local_rowset = output_rowset_meta->is_local(); |
331 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset", |
332 | 16 | { is_local_rowset = false; }) |
333 | 16 | if (!is_local_rowset) [[unlikely]] { |
334 | | // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id(); |
335 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
336 | 0 | _tablet->tablet_id(), |
337 | 0 | output_rowset_meta->rowset_id().to_string()); |
338 | 0 | } |
339 | | |
340 | 16 | if (_is_drop_op) { |
341 | 4 | const auto& output_rs_tablet_schema = output_rowset_meta->tablet_schema(); |
342 | 4 | if (output_rs_tablet_schema->get_inverted_index_storage_format() != |
343 | 4 | InvertedIndexStorageFormatPB::V1) { |
344 | 3 | const auto& fs = output_rowset_meta->fs(); |
345 | | |
346 | 3 | const auto& output_rowset_schema = output_rowset_meta->tablet_schema(); |
347 | 3 | size_t inverted_index_size = 0; |
348 | 3 | for (auto& seg_ptr : segments) { |
349 | 3 | auto idx_file_reader_iter = _index_file_readers.find( |
350 | 3 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
351 | 3 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op", |
352 | 3 | { idx_file_reader_iter = _index_file_readers.end(); }) |
353 | 3 | if (idx_file_reader_iter == _index_file_readers.end()) { |
354 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
355 | 0 | << seg_ptr->id() << " cannot be found"; |
356 | 0 | continue; |
357 | 0 | } |
358 | 3 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
359 | | |
360 | 3 | std::string index_path_prefix { |
361 | 3 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
362 | 3 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
363 | 3 | seg_ptr->id()))}; |
364 | | |
365 | 3 | std::string index_path = |
366 | 3 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
367 | 3 | io::FileWriterPtr file_writer; |
368 | 3 | Status st = fs->create_file(index_path, &file_writer); |
369 | 3 | if (!st.ok()) { |
370 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
371 | 0 | << ", err: " << st; |
372 | 0 | return st; |
373 | 0 | } |
374 | 3 | auto index_file_writer = std::make_unique<IndexFileWriter>( |
375 | 3 | fs, std::move(index_path_prefix), |
376 | 3 | output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), |
377 | 3 | output_rowset_schema->get_inverted_index_storage_format(), |
378 | 3 | std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); |
379 | 3 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
380 | | // create inverted index writer |
381 | 3 | for (auto& index_meta : _dropped_inverted_indexes) { |
382 | 3 | RETURN_IF_ERROR(index_file_writer->delete_index(&index_meta)); |
383 | 3 | } |
384 | 3 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
385 | 3 | } |
386 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
387 | 3 | auto st = index_file_writer->begin_close(); |
388 | 3 | if (!st.ok()) { |
389 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
390 | 0 | return st; |
391 | 0 | } |
392 | 3 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
393 | 3 | } |
394 | 3 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
395 | 3 | auto st = index_file_writer->finish_close(); |
396 | 3 | if (!st.ok()) { |
397 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
398 | 0 | return st; |
399 | 0 | } |
400 | 3 | } |
401 | 3 | _index_file_writers.clear(); |
402 | 3 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
403 | 3 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
404 | 3 | inverted_index_size); |
405 | 3 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
406 | 3 | inverted_index_size); |
407 | 3 | } |
408 | 4 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
409 | 4 | return Status::OK(); |
410 | 12 | } else { |
411 | | // create inverted or ann index writer |
412 | 12 | const auto& fs = output_rowset_meta->fs(); |
413 | 12 | auto output_rowset_schema = output_rowset_meta->tablet_schema(); |
414 | 12 | size_t inverted_index_size = 0; |
415 | 15 | for (auto& seg_ptr : segments) { |
416 | 15 | std::string index_path_prefix { |
417 | 15 | InvertedIndexDescriptor::get_index_file_path_prefix(local_segment_path( |
418 | 15 | _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), |
419 | 15 | seg_ptr->id()))}; |
420 | 15 | std::vector<ColumnId> return_columns; |
421 | 15 | std::vector<std::pair<int64_t, int64_t>> inverted_index_writer_signs; |
422 | 15 | _olap_data_convertor->reserve(_alter_inverted_indexes.size()); |
423 | | |
424 | 15 | std::unique_ptr<IndexFileWriter> index_file_writer = nullptr; |
425 | 15 | if (output_rowset_schema->get_inverted_index_storage_format() >= |
426 | 15 | InvertedIndexStorageFormatPB::V2) { |
427 | 12 | auto idx_file_reader_iter = _index_file_readers.find( |
428 | 12 | std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); |
429 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader", |
430 | 12 | { idx_file_reader_iter = _index_file_readers.end(); }) |
431 | 12 | if (idx_file_reader_iter == _index_file_readers.end()) { |
432 | 0 | LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" |
433 | 0 | << seg_ptr->id() << " cannot be found"; |
434 | 0 | continue; |
435 | 0 | } |
436 | 12 | std::string index_path = |
437 | 12 | InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); |
438 | 12 | io::FileWriterPtr file_writer; |
439 | 12 | Status st = fs->create_file(index_path, &file_writer); |
440 | 12 | if (!st.ok()) { |
441 | 0 | LOG(WARNING) << "failed to create writable file. path=" << index_path |
442 | 0 | << ", err: " << st; |
443 | 0 | return st; |
444 | 0 | } |
445 | 12 | auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); |
446 | 12 | index_file_writer = std::make_unique<IndexFileWriter>( |
447 | 12 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
448 | 12 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), |
449 | 12 | std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); |
450 | 12 | RETURN_IF_ERROR(index_file_writer->initialize(dirs)); |
451 | 12 | } else { |
452 | 3 | index_file_writer = std::make_unique<IndexFileWriter>( |
453 | 3 | fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), |
454 | 3 | seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), |
455 | 3 | nullptr, true /* can_use_ram_dir */, _tablet->tablet_id()); |
456 | 3 | } |
457 | | // create inverted index writer, or ann index writer |
458 | 19 | for (auto inverted_index : _alter_inverted_indexes) { |
459 | 19 | DCHECK(inverted_index.index_type == TIndexType::INVERTED || |
460 | 19 | inverted_index.index_type == TIndexType::ANN); |
461 | 19 | DCHECK_EQ(inverted_index.columns.size(), 1); |
462 | 19 | auto index_id = inverted_index.index_id; |
463 | 19 | auto column_name = inverted_index.columns[0]; |
464 | 19 | auto column_idx = output_rowset_schema->field_index(column_name); |
465 | 19 | if (column_idx < 0) { |
466 | 4 | if (inverted_index.__isset.column_unique_ids && |
467 | 4 | !inverted_index.column_unique_ids.empty()) { |
468 | 1 | column_idx = output_rowset_schema->field_index( |
469 | 1 | inverted_index.column_unique_ids[0]); |
470 | 1 | } |
471 | 4 | if (column_idx < 0) { |
472 | 3 | LOG(WARNING) << "referenced column was missing. " |
473 | 3 | << "[column=" << column_name |
474 | 3 | << " referenced_column=" << column_idx << "]"; |
475 | 3 | continue; |
476 | 3 | } |
477 | 4 | } |
478 | 16 | auto column = output_rowset_schema->column(column_idx); |
479 | | // variant column is not support for building index |
480 | 16 | auto is_support_inverted_index = |
481 | 16 | IndexColumnWriter::check_support_inverted_index(column); |
482 | 16 | auto is_support_ann_index = IndexColumnWriter::check_support_ann_index(column); |
483 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index", |
484 | 16 | { is_support_inverted_index = false; }) |
485 | 16 | if (!is_support_inverted_index && !is_support_ann_index) { |
486 | 0 | continue; |
487 | 0 | } |
488 | 16 | DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id)); |
489 | 16 | _olap_data_convertor->add_column_data_convertor(column); |
490 | 16 | return_columns.emplace_back(column_idx); |
491 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
492 | | |
493 | 16 | if (inverted_index.index_type == TIndexType::INVERTED) { |
494 | | // inverted index |
495 | 15 | auto index_metas = output_rowset_schema->inverted_indexs(column); |
496 | 15 | for (const auto& index_meta : index_metas) { |
497 | 15 | if (index_meta->index_id() != index_id) { |
498 | 0 | continue; |
499 | 0 | } |
500 | 15 | std::unique_ptr<segment_v2::IndexColumnWriter> inverted_index_builder; |
501 | 15 | try { |
502 | 15 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
503 | 15 | field.get(), &inverted_index_builder, index_file_writer.get(), |
504 | 15 | index_meta)); |
505 | 15 | DBUG_EXECUTE_IF( |
506 | 15 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
507 | 15 | "error", |
508 | 15 | { |
509 | 15 | _CLTHROWA(CL_ERR_IO, |
510 | 15 | "debug point: " |
511 | 15 | "handle_single_rowset_index_column_writer_create_" |
512 | 15 | "error"); |
513 | 15 | }) |
514 | 15 | } catch (const std::exception& e) { |
515 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
516 | 0 | "CLuceneError occured: {}", e.what()); |
517 | 0 | } |
518 | | |
519 | 15 | if (inverted_index_builder) { |
520 | 15 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
521 | 15 | _index_column_writers.insert( |
522 | 15 | std::make_pair(writer_sign, std::move(inverted_index_builder))); |
523 | 15 | inverted_index_writer_signs.emplace_back(writer_sign); |
524 | 15 | } |
525 | 15 | } |
526 | 15 | } else if (inverted_index.index_type == TIndexType::ANN) { |
527 | | // ann index |
528 | 1 | const auto* index_meta = output_rowset_schema->ann_index(column); |
529 | 1 | if (index_meta && index_meta->index_id() == index_id) { |
530 | 1 | std::unique_ptr<segment_v2::IndexColumnWriter> index_writer; |
531 | 1 | try { |
532 | 1 | RETURN_IF_ERROR(segment_v2::IndexColumnWriter::create( |
533 | 1 | field.get(), &index_writer, index_file_writer.get(), |
534 | 1 | index_meta)); |
535 | 1 | DBUG_EXECUTE_IF( |
536 | 1 | "IndexBuilder::handle_single_rowset_index_column_writer_create_" |
537 | 1 | "error", |
538 | 1 | { |
539 | 1 | _CLTHROWA(CL_ERR_IO, |
540 | 1 | "debug point: " |
541 | 1 | "handle_single_rowset_index_column_writer_create_" |
542 | 1 | "error"); |
543 | 1 | }) |
544 | 1 | } catch (const std::exception& e) { |
545 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
546 | 0 | "CLuceneError occured: {}", e.what()); |
547 | 0 | } |
548 | | |
549 | 1 | if (index_writer) { |
550 | 1 | auto writer_sign = std::make_pair(seg_ptr->id(), index_id); |
551 | 1 | _index_column_writers.insert( |
552 | 1 | std::make_pair(writer_sign, std::move(index_writer))); |
553 | 1 | inverted_index_writer_signs.emplace_back(writer_sign); |
554 | 1 | } |
555 | 1 | } |
556 | 1 | } |
557 | 16 | } |
558 | | |
559 | | // DO NOT forget index_file_writer for the segment, otherwise, original inverted index will be deleted. |
560 | 15 | _index_file_writers.emplace(seg_ptr->id(), std::move(index_file_writer)); |
561 | 15 | if (return_columns.empty()) { |
562 | | // no columns to read |
563 | 3 | continue; |
564 | 3 | } |
565 | | // create iterator for each segment |
566 | 12 | StorageReadOptions read_options; |
567 | 12 | OlapReaderStatistics stats; |
568 | 12 | read_options.stats = &stats; |
569 | 12 | read_options.tablet_schema = output_rowset_schema; |
570 | 12 | std::shared_ptr<Schema> schema = |
571 | 12 | std::make_shared<Schema>(output_rowset_schema->columns(), return_columns); |
572 | 12 | std::unique_ptr<RowwiseIterator> iter; |
573 | 12 | auto res = seg_ptr->new_iterator(schema, read_options, &iter); |
574 | 12 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", { |
575 | 12 | res = Status::Error<ErrorCode::INTERNAL_ERROR>( |
576 | 12 | "debug point: handle_single_rowset_create_iterator_error"); |
577 | 12 | }) |
578 | 12 | if (!res.ok()) { |
579 | 0 | LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() |
580 | 0 | << "]: " << res.to_string(); |
581 | 0 | return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string()); |
582 | 0 | } |
583 | | |
584 | 12 | auto block = Block::create_unique(output_rowset_schema->create_block(return_columns)); |
585 | 24 | while (true) { |
586 | 24 | auto status = iter->next_batch(block.get()); |
587 | 24 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", { |
588 | 24 | status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
589 | 24 | "next_batch fault injection"); |
590 | 24 | }); |
591 | 24 | if (!status.ok()) { |
592 | 12 | if (status.is<ErrorCode::END_OF_FILE>()) { |
593 | 12 | break; |
594 | 12 | } |
595 | 12 | LOG(WARNING) |
596 | 0 | << "failed to read next block when schema change for inverted index." |
597 | 0 | << ", err=" << status.to_string(); |
598 | 0 | return status; |
599 | 12 | } |
600 | | |
601 | | // write inverted index data, or ann index data |
602 | 12 | status = _write_inverted_index_data(output_rowset_schema, iter->data_id(), |
603 | 12 | block.get()); |
604 | 12 | DBUG_EXECUTE_IF( |
605 | 12 | "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", { |
606 | 12 | status = Status::Error<ErrorCode::INTERNAL_ERROR>( |
607 | 12 | "debug point: " |
608 | 12 | "handle_single_rowset_write_inverted_index_data_error"); |
609 | 12 | }) |
610 | 12 | if (!status.ok()) { |
611 | 0 | return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( |
612 | 0 | "failed to write block."); |
613 | 0 | } |
614 | 12 | block->clear_column_data(); |
615 | 12 | } |
616 | | |
617 | | // finish write inverted index, flush data to compound file |
618 | 16 | for (auto& writer_sign : inverted_index_writer_signs) { |
619 | 16 | try { |
620 | 16 | if (_index_column_writers[writer_sign]) { |
621 | 16 | RETURN_IF_ERROR(_index_column_writers[writer_sign]->finish()); |
622 | 16 | } |
623 | 16 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", { |
624 | 16 | _CLTHROWA(CL_ERR_IO, |
625 | 16 | "debug point: handle_single_rowset_index_build_finish_error"); |
626 | 16 | }) |
627 | 16 | } catch (const std::exception& e) { |
628 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
629 | 0 | "CLuceneError occured: {}", e.what()); |
630 | 0 | } |
631 | 16 | } |
632 | | |
633 | 12 | _olap_data_convertor->reset(); |
634 | 12 | } |
635 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
636 | 15 | auto st = index_file_writer->begin_close(); |
637 | 15 | DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", { |
638 | 15 | st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
639 | 15 | "debug point: handle_single_rowset_file_writer_close_error"); |
640 | 15 | }) |
641 | 15 | if (!st.ok()) { |
642 | 0 | LOG(ERROR) << "close index_file_writer error:" << st; |
643 | 0 | return st; |
644 | 0 | } |
645 | 15 | inverted_index_size += index_file_writer->get_index_file_total_size(); |
646 | 15 | } |
647 | 15 | for (auto&& [seg_id, index_file_writer] : _index_file_writers) { |
648 | 15 | auto st = index_file_writer->finish_close(); |
649 | 15 | if (!st.ok()) { |
650 | 0 | LOG(ERROR) << "wait close index_file_writer error:" << st; |
651 | 0 | return st; |
652 | 0 | } |
653 | 15 | } |
654 | 12 | _index_column_writers.clear(); |
655 | 12 | _index_file_writers.clear(); |
656 | 12 | output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size()); |
657 | 12 | output_rowset_meta->set_total_disk_size(output_rowset_meta->total_disk_size() + |
658 | 12 | inverted_index_size); |
659 | 12 | output_rowset_meta->set_index_disk_size(output_rowset_meta->index_disk_size() + |
660 | 12 | inverted_index_size); |
661 | 12 | LOG(INFO) << "all row nums. source_rows=" << output_rowset_meta->num_rows(); |
662 | 12 | } |
663 | | |
664 | 12 | return Status::OK(); |
665 | 16 | } |
666 | | |
667 | | Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, int64_t segment_idx, |
668 | 12 | Block* block) { |
669 | 12 | VLOG_DEBUG << "begin to write inverted/ann index"; |
670 | | // converter block data |
671 | 12 | _olap_data_convertor->set_source_content(block, 0, block->rows()); |
672 | 28 | for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) { |
673 | 16 | auto inverted_index = _alter_inverted_indexes[i]; |
674 | 16 | auto index_id = inverted_index.index_id; |
675 | 16 | auto column_name = inverted_index.columns[0]; |
676 | 16 | auto column_idx = tablet_schema->field_index(column_name); |
677 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative", |
678 | 16 | { column_idx = -1; }) |
679 | 16 | if (column_idx < 0) { |
680 | 1 | if (!inverted_index.column_unique_ids.empty()) { |
681 | 1 | auto column_unique_id = inverted_index.column_unique_ids[0]; |
682 | 1 | column_idx = tablet_schema->field_index(column_unique_id); |
683 | 1 | } |
684 | 1 | if (column_idx < 0) { |
685 | 0 | LOG(WARNING) << "referenced column was missing. " |
686 | 0 | << "[column=" << column_name << " referenced_column=" << column_idx |
687 | 0 | << "]"; |
688 | 0 | continue; |
689 | 0 | } |
690 | 1 | } |
691 | 16 | auto column = tablet_schema->column(column_idx); |
692 | 16 | auto writer_sign = std::make_pair(segment_idx, index_id); |
693 | 16 | std::unique_ptr<StorageField> field(StorageFieldFactory::create(column)); |
694 | 16 | auto converted_result = _olap_data_convertor->convert_column_data(i); |
695 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", { |
696 | 16 | converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>( |
697 | 16 | "debug point: _write_inverted_index_data_convert_column_data_error"); |
698 | 16 | }) |
699 | 16 | if (converted_result.first != Status::OK()) { |
700 | 0 | LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first; |
701 | 0 | return converted_result.first; |
702 | 0 | } |
703 | 16 | const auto* ptr = (const uint8_t*)converted_result.second->get_data(); |
704 | 16 | const auto* null_map = converted_result.second->get_nullmap(); |
705 | 16 | if (null_map) { |
706 | 0 | RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(), null_map, &ptr, |
707 | 0 | block->rows())); |
708 | 16 | } else { |
709 | 16 | RETURN_IF_ERROR(_add_data(column_name, writer_sign, field.get(), &ptr, block->rows())); |
710 | 16 | } |
711 | 16 | } |
712 | 12 | _olap_data_convertor->clear_source_content(); |
713 | | |
714 | 12 | return Status::OK(); |
715 | 12 | } |
716 | | |
717 | | Status IndexBuilder::_add_nullable(const std::string& column_name, |
718 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
719 | | StorageField* field, const uint8_t* null_map, |
720 | 0 | const uint8_t** ptr, size_t num_rows) { |
721 | | // TODO: need to process null data for inverted index |
722 | 0 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
723 | 0 | DCHECK(field->get_sub_field_count() == 1); |
724 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
725 | 0 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
726 | | // total number length |
727 | 0 | auto offset_data = *(data_ptr + 1); |
728 | 0 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
729 | 0 | try { |
730 | 0 | auto data = *(data_ptr + 2); |
731 | 0 | auto nested_null_map = *(data_ptr + 3); |
732 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
733 | 0 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
734 | 0 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
735 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", { |
736 | 0 | _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error"); |
737 | 0 | }) |
738 | 0 | RETURN_IF_ERROR( |
739 | 0 | _index_column_writers[index_writer_sign]->add_array_nulls(null_map, num_rows)); |
740 | 0 | } catch (const std::exception& e) { |
741 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
742 | 0 | "CLuceneError occured: {}", e.what()); |
743 | 0 | } |
744 | | |
745 | 0 | return Status::OK(); |
746 | 0 | } |
747 | 0 | size_t offset = 0; |
748 | 0 | auto next_run_step = [&]() { |
749 | 0 | size_t step = 1; |
750 | 0 | for (auto i = offset + 1; i < num_rows; ++i) { |
751 | 0 | if (null_map[offset] == null_map[i]) { |
752 | 0 | step++; |
753 | 0 | } else { |
754 | 0 | break; |
755 | 0 | } |
756 | 0 | } |
757 | 0 | return step; |
758 | 0 | }; |
759 | 0 | try { |
760 | 0 | do { |
761 | 0 | auto step = next_run_step(); |
762 | 0 | if (null_map[offset]) { |
763 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_nulls( |
764 | 0 | static_cast<uint32_t>(step))); |
765 | 0 | } else { |
766 | 0 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, |
767 | 0 | *ptr, step)); |
768 | 0 | } |
769 | 0 | *ptr += field->size() * step; |
770 | 0 | offset += step; |
771 | 0 | DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception", |
772 | 0 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); }) |
773 | 0 | } while (offset < num_rows); |
774 | 0 | } catch (const std::exception& e) { |
775 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
776 | 0 | e.what()); |
777 | 0 | } |
778 | | |
779 | 0 | return Status::OK(); |
780 | 0 | } |
781 | | |
782 | | Status IndexBuilder::_add_data(const std::string& column_name, |
783 | | const std::pair<int64_t, int64_t>& index_writer_sign, |
784 | 16 | StorageField* field, const uint8_t** ptr, size_t num_rows) { |
785 | 16 | try { |
786 | 16 | if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
787 | 2 | DCHECK(field->get_sub_field_count() == 1); |
788 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
789 | 2 | const auto* data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
790 | | // total number length |
791 | 2 | auto element_cnt = size_t((unsigned long)(*data_ptr)); |
792 | 2 | auto offset_data = *(data_ptr + 1); |
793 | 2 | const auto* offsets_ptr = (const uint8_t*)offset_data; |
794 | 2 | if (element_cnt > 0) { |
795 | 2 | auto data = *(data_ptr + 2); |
796 | 2 | auto nested_null_map = *(data_ptr + 3); |
797 | 2 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_array_values( |
798 | 2 | field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), |
799 | 2 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
800 | 2 | } |
801 | 14 | } else { |
802 | 14 | RETURN_IF_ERROR(_index_column_writers[index_writer_sign]->add_values(column_name, *ptr, |
803 | 14 | num_rows)); |
804 | 14 | } |
805 | 16 | DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception", |
806 | 16 | { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); }) |
807 | 16 | } catch (const std::exception& e) { |
808 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", |
809 | 0 | e.what()); |
810 | 0 | } |
811 | | |
812 | 16 | return Status::OK(); |
813 | 16 | } |
814 | | |
815 | 17 | Status IndexBuilder::handle_inverted_index_data() { |
816 | 17 | LOG(INFO) << "begin to handle_inverted_index_data"; |
817 | 17 | DCHECK(_input_rowsets.size() == _output_rowsets.size()); |
818 | 17 | for (auto& _output_rowset : _output_rowsets) { |
819 | 17 | SegmentCacheHandle segment_cache_handle; |
820 | 17 | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
821 | 17 | std::static_pointer_cast<BetaRowset>(_output_rowset), &segment_cache_handle)); |
822 | 17 | auto output_rowset_meta = _output_rowset->rowset_meta(); |
823 | 17 | auto& segments = segment_cache_handle.get_segments(); |
824 | 17 | RETURN_IF_ERROR(handle_single_rowset(output_rowset_meta, segments)); |
825 | 17 | } |
826 | 16 | return Status::OK(); |
827 | 17 | } |
828 | | |
829 | 21 | Status IndexBuilder::do_build_inverted_index() { |
830 | 21 | LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id() |
831 | 21 | << ", is_drop_op=" << _is_drop_op; |
832 | 21 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty", |
833 | 21 | { _alter_inverted_indexes.clear(); }) |
834 | 21 | if (_alter_inverted_indexes.empty()) { |
835 | 0 | return Status::OK(); |
836 | 0 | } |
837 | | |
838 | 21 | static constexpr long TRY_LOCK_TIMEOUT = 30; |
839 | 21 | std::unique_lock schema_change_lock(_tablet->get_schema_change_lock(), std::defer_lock); |
840 | 21 | bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT)); |
841 | | |
842 | 21 | if (!owns_lock) { |
843 | 0 | return Status::ObtainLockFailed( |
844 | 0 | "try schema_change_lock failed. There might be schema change or cooldown running " |
845 | 0 | "on " |
846 | 0 | "tablet={} ", |
847 | 0 | _tablet->tablet_id()); |
848 | 0 | } |
849 | | // Check executing serially with compaction task. |
850 | 21 | std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(), |
851 | 21 | std::try_to_lock); |
852 | 21 | if (!base_compaction_lock.owns_lock()) { |
853 | 0 | return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ", |
854 | 0 | _tablet->tablet_id()); |
855 | 0 | } |
856 | 21 | std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), |
857 | 21 | std::try_to_lock); |
858 | 21 | if (!cumu_compaction_lock.owns_lock()) { |
859 | 0 | return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}", |
860 | 0 | _tablet->tablet_id()); |
861 | 0 | } |
862 | | |
863 | 21 | std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(), |
864 | 21 | std::try_to_lock); |
865 | 21 | if (!cold_compaction_lock.owns_lock()) { |
866 | 0 | return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}", |
867 | 0 | _tablet->tablet_id()); |
868 | 0 | } |
869 | | |
870 | 21 | std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(), |
871 | 21 | std::try_to_lock); |
872 | 21 | if (!build_inverted_index_lock.owns_lock()) { |
873 | 0 | return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}", |
874 | 0 | _tablet->tablet_id()); |
875 | 0 | } |
876 | | |
877 | 21 | std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock); |
878 | 21 | if (!migration_rlock.owns_lock()) { |
879 | 0 | return Status::ObtainLockFailed("got migration_rlock failed. tablet={}", |
880 | 0 | _tablet->tablet_id()); |
881 | 0 | } |
882 | | |
883 | 21 | DCHECK(!_alter_index_ids.empty()); |
884 | 21 | _input_rowsets = |
885 | 21 | _tablet->pick_candidate_rowsets_to_build_inverted_index(_alter_index_ids, _is_drop_op); |
886 | 21 | if (_input_rowsets.empty()) { |
887 | 1 | LOG(INFO) << "_input_rowsets is empty"; |
888 | 1 | return Status::OK(); |
889 | 1 | } |
890 | | |
891 | 20 | auto st = update_inverted_index_info(); |
892 | 20 | if (!st.ok()) { |
893 | 3 | LOG(WARNING) << "failed to update_inverted_index_info. " |
894 | 3 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
895 | 3 | gc_output_rowset(); |
896 | 3 | return st; |
897 | 3 | } |
898 | | |
899 | | // create inverted index file for output rowset |
900 | 17 | st = handle_inverted_index_data(); |
901 | 17 | if (!st.ok()) { |
902 | 1 | LOG(WARNING) << "failed to handle_inverted_index_data. " |
903 | 1 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
904 | 1 | gc_output_rowset(); |
905 | 1 | return st; |
906 | 1 | } |
907 | | |
908 | | // modify rowsets in memory |
909 | 16 | st = modify_rowsets(); |
910 | 16 | DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", { |
911 | 16 | st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>( |
912 | 16 | "debug point: do_build_inverted_index_modify_rowsets_status_error"); |
913 | 16 | }) |
914 | 16 | if (!st.ok()) { |
915 | 0 | LOG(WARNING) << "failed to modify rowsets in memory. " |
916 | 0 | << "tablet=" << _tablet->tablet_id() << ", error=" << st; |
917 | 0 | gc_output_rowset(); |
918 | 0 | return st; |
919 | 0 | } |
920 | 16 | return Status::OK(); |
921 | 16 | } |
922 | | |
923 | 16 | Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { |
924 | 16 | DCHECK(std::ranges::all_of( |
925 | 16 | _output_rowsets.begin(), _output_rowsets.end(), [&engine = _engine](auto&& rs) { |
926 | 16 | if (engine.check_rowset_id_in_unused_rowsets(rs->rowset_id())) { |
927 | 16 | LOG(ERROR) << "output rowset: " << rs->rowset_id() << " in unused rowsets"; |
928 | 16 | return false; |
929 | 16 | } |
930 | 16 | return true; |
931 | 16 | })); |
932 | | |
933 | 16 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
934 | 16 | _tablet->enable_unique_key_merge_on_write()) { |
935 | 1 | std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock()); |
936 | 1 | std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); |
937 | 1 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
938 | 1 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id()); |
939 | 2 | for (auto i = 0; i < _input_rowsets.size(); ++i) { |
940 | 1 | RowsetId input_rowset_id = _input_rowsets[i]->rowset_id(); |
941 | 1 | RowsetId output_rowset_id = _output_rowsets[i]->rowset_id(); |
942 | 1 | for (const auto& [k, v] : _tablet->tablet_meta()->delete_bitmap().delete_bitmap) { |
943 | 0 | RowsetId rs_id = std::get<0>(k); |
944 | 0 | if (rs_id == input_rowset_id) { |
945 | 0 | DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id, std::get<1>(k), |
946 | 0 | std::get<2>(k)}; |
947 | 0 | auto res = delete_bitmap->set(output_rs_key, v); |
948 | 0 | DCHECK(res > 0) << "delete_bitmap set failed, res=" << res; |
949 | 0 | } |
950 | 0 | } |
951 | 1 | } |
952 | 1 | _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); |
953 | | |
954 | | // modify_rowsets will remove the delete_bitmap for input rowsets, |
955 | | // should call it after merge delete_bitmap |
956 | 1 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
957 | 15 | } else { |
958 | 15 | std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); |
959 | 15 | RETURN_IF_ERROR(_tablet->modify_rowsets(_output_rowsets, _input_rowsets, true)); |
960 | 15 | } |
961 | | |
962 | | #ifndef BE_TEST |
963 | | { |
964 | | std::shared_lock rlock(_tablet->get_header_lock()); |
965 | | _tablet->save_meta(); |
966 | | } |
967 | | #endif |
968 | 16 | return Status::OK(); |
969 | 16 | } |
970 | | |
971 | 4 | void IndexBuilder::gc_output_rowset() { |
972 | 4 | for (auto&& output_rowset : _output_rowsets) { |
973 | 2 | auto is_local_rowset = output_rowset->is_local(); |
974 | 2 | DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset", |
975 | 2 | { is_local_rowset = false; }) |
976 | 2 | if (!is_local_rowset) { |
977 | 0 | _tablet->record_unused_remote_rowset(output_rowset->rowset_id(), |
978 | 0 | output_rowset->rowset_meta()->resource_id(), |
979 | 0 | output_rowset->num_segments()); |
980 | 0 | return; |
981 | 0 | } |
982 | 2 | _engine.add_unused_rowset(output_rowset); |
983 | 2 | } |
984 | 4 | } |
985 | | |
986 | | } // namespace doris |