be/src/cloud/cloud_full_compaction.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_full_compaction.h" |
19 | | |
20 | | #include <gen_cpp/cloud.pb.h> |
21 | | |
22 | | #include <boost/container_hash/hash.hpp> |
23 | | |
24 | | #include "cloud/cloud_meta_mgr.h" |
25 | | #include "cloud/cloud_storage_engine.h" |
26 | | #include "cloud/config.h" |
27 | | #include "common/config.h" |
28 | | #include "common/status.h" |
29 | | #include "core/column/column.h" |
30 | | #include "cpp/sync_point.h" |
31 | | #include "service/backend_options.h" |
32 | | #include "storage/compaction/compaction.h" |
33 | | #include "storage/rowset/beta_rowset.h" |
34 | | #include "storage/tablet/tablet_meta.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/thread.h" |
37 | | #include "util/uuid_generator.h" |
38 | | |
39 | | namespace doris { |
40 | | using namespace ErrorCode; |
41 | | |
42 | | bvar::Adder<uint64_t> full_output_size("full_compaction", "output_size"); |
43 | | |
44 | | CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) |
45 | 105 | : CloudCompactionMixin(engine, tablet, |
46 | 105 | "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} |
47 | | |
48 | 105 | CloudFullCompaction::~CloudFullCompaction() = default; |
49 | | |
50 | 105 | Status CloudFullCompaction::prepare_compact() { |
51 | 105 | Status st; |
52 | 105 | Defer defer_set_st([&] { |
53 | 105 | if (!st.ok()) { |
54 | 1 | cloud_tablet()->set_last_full_compaction_status(st.to_string()); |
55 | 1 | cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis()); |
56 | 1 | } |
57 | 105 | }); |
58 | 105 | if (_tablet->tablet_state() != TABLET_RUNNING) { |
59 | 0 | st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); |
60 | 0 | return st; |
61 | 0 | } |
62 | | |
63 | | // always sync latest rowset for full compaction |
64 | 105 | st = cloud_tablet()->sync_rowsets(); |
65 | 105 | RETURN_IF_ERROR(st); |
66 | | |
67 | 105 | st = pick_rowsets_to_compact(); |
68 | 105 | RETURN_IF_ERROR(st); |
69 | | |
70 | 645 | for (auto& rs : _input_rowsets) { |
71 | 645 | _input_row_num += rs->num_rows(); |
72 | 645 | _input_segments += rs->num_segments(); |
73 | 645 | _input_rowsets_data_size += rs->data_disk_size(); |
74 | 645 | _input_rowsets_index_size += rs->index_disk_size(); |
75 | 645 | _input_rowsets_total_size += rs->total_disk_size(); |
76 | 645 | } |
77 | 104 | LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), |
78 | 104 | _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) |
79 | 104 | .tag("job_id", _uuid) |
80 | 104 | .tag("input_rowsets", _input_rowsets.size()) |
81 | 104 | .tag("input_rows", _input_row_num) |
82 | 104 | .tag("input_segments", _input_segments) |
83 | 104 | .tag("input_rowsets_data_size", _input_rowsets_data_size) |
84 | 104 | .tag("input_rowsets_index_size", _input_rowsets_index_size) |
85 | 104 | .tag("input_rowsets_total_size", _input_rowsets_total_size); |
86 | 104 | return Status::OK(); |
87 | 105 | } |
88 | 104 | Status CloudFullCompaction::request_global_lock() { |
89 | | // prepare compaction job |
90 | 104 | cloud::TabletJobInfoPB job; |
91 | 104 | auto idx = job.mutable_idx(); |
92 | 104 | idx->set_tablet_id(_tablet->tablet_id()); |
93 | 104 | idx->set_table_id(_tablet->table_id()); |
94 | 104 | idx->set_index_id(_tablet->index_id()); |
95 | 104 | idx->set_partition_id(_tablet->partition_id()); |
96 | 104 | auto compaction_job = job.add_compaction(); |
97 | 104 | compaction_job->set_id(_uuid); |
98 | 104 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
99 | 104 | std::to_string(config::heartbeat_service_port)); |
100 | 104 | compaction_job->set_type(cloud::TabletCompactionJobPB::FULL); |
101 | 104 | compaction_job->set_base_compaction_cnt(_base_compaction_cnt); |
102 | 104 | compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); |
103 | 104 | using namespace std::chrono; |
104 | 104 | int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
105 | 104 | _expiration = now + config::compaction_timeout_seconds; |
106 | 104 | compaction_job->set_expiration(_expiration); |
107 | 104 | compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); |
108 | | // Set input version range to let meta-service judge version range conflict |
109 | 104 | compaction_job->add_input_versions(_input_rowsets.front()->start_version()); |
110 | 104 | compaction_job->add_input_versions(_input_rowsets.back()->end_version()); |
111 | 104 | cloud::StartTabletJobResponse resp; |
112 | 104 | auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); |
113 | 104 | if (!st.ok()) { |
114 | 0 | if (resp.status().code() == cloud::STALE_TABLET_CACHE) { |
115 | | // set last_sync_time to 0 to force sync tablet next time |
116 | 0 | cloud_tablet()->last_sync_time_s = 0; |
117 | 0 | } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { |
118 | | // tablet not found |
119 | 0 | cloud_tablet()->clear_cache(); |
120 | 0 | } |
121 | 0 | } |
122 | 104 | return st; |
123 | 104 | } |
124 | | |
125 | 105 | Status CloudFullCompaction::pick_rowsets_to_compact() { |
126 | 105 | _input_rowsets.clear(); |
127 | 105 | { |
128 | 105 | std::shared_lock rlock(_tablet->get_header_lock()); |
129 | 105 | _base_compaction_cnt = cloud_tablet()->base_compaction_cnt(); |
130 | 105 | _cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt(); |
131 | 105 | _input_rowsets = cloud_tablet()->pick_candidate_rowsets_to_full_compaction(); |
132 | 105 | } |
133 | 105 | if (auto st = check_version_continuity(_input_rowsets); !st.ok()) { |
134 | 0 | DCHECK(false) << st; |
135 | 0 | return st; |
136 | 0 | } |
137 | 105 | if (_input_rowsets.size() <= 1) { |
138 | 1 | return Status::Error<BE_NO_SUITABLE_VERSION>( |
139 | 1 | "insufficent compaction input rowset, #rowsets={}", _input_rowsets.size()); |
140 | 1 | } |
141 | | |
142 | 104 | if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { |
143 | | // the tablet is with rowset: [0-1], [2-y] |
144 | | // and [0-1] has no data. in this situation, no need to do full compaction. |
145 | 0 | return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); |
146 | 0 | } |
147 | | |
148 | 104 | return Status::OK(); |
149 | 104 | } |
150 | | |
151 | 104 | Status CloudFullCompaction::execute_compact() { |
152 | 104 | DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", { |
153 | 104 | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
154 | 104 | LOG_INFO( |
155 | 104 | "[verbose] CloudFullCompaction::execute_compact.block, target_tablet_id={}, " |
156 | 104 | "tablet_id={}", |
157 | 104 | target_tablet_id, cloud_tablet()->tablet_id()); |
158 | 104 | if (target_tablet_id == cloud_tablet()->tablet_id()) { |
159 | 104 | DBUG_BLOCK; |
160 | 104 | } |
161 | 104 | }); |
162 | 104 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", Status::OK(), |
163 | 104 | this); |
164 | 104 | #ifndef __APPLE__ |
165 | 104 | if (config::enable_base_compaction_idle_sched) { |
166 | 104 | Thread::set_idle_sched(); |
167 | 104 | } |
168 | 104 | #endif |
169 | | |
170 | 104 | SCOPED_ATTACH_TASK(_mem_tracker); |
171 | | |
172 | 104 | using namespace std::chrono; |
173 | 104 | auto start = steady_clock::now(); |
174 | 104 | auto res = CloudCompactionMixin::execute_compact(); |
175 | 104 | cloud_tablet()->set_last_full_compaction_status(res.to_string()); |
176 | 104 | if (!res.ok()) { |
177 | 0 | cloud_tablet()->set_last_full_compaction_failure_time(UnixMillis()); |
178 | 0 | LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res |
179 | 0 | << ", tablet=" << _tablet->tablet_id() |
180 | 0 | << ", output_version=" << _output_version; |
181 | 0 | return res; |
182 | 0 | } |
183 | 104 | LOG_INFO("finish CloudFullCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(), |
184 | 104 | duration_cast<milliseconds>(steady_clock::now() - start).count()) |
185 | 104 | .tag("job_id", _uuid) |
186 | 104 | .tag("input_rowsets", _input_rowsets.size()) |
187 | 104 | .tag("input_rows", _input_row_num) |
188 | 104 | .tag("input_segments", _input_segments) |
189 | 104 | .tag("input_rowsets_data_size", _input_rowsets_data_size) |
190 | 104 | .tag("input_rowsets_index_size", _input_rowsets_index_size) |
191 | 104 | .tag("input_rowsets_total_size", _input_rowsets_total_size) |
192 | 104 | .tag("output_rows", _output_rowset->num_rows()) |
193 | 104 | .tag("output_segments", _output_rowset->num_segments()) |
194 | 104 | .tag("output_rowset_data_size", _output_rowset->data_disk_size()) |
195 | 104 | .tag("output_rowset_index_size", _output_rowset->index_disk_size()) |
196 | 104 | .tag("output_rowset_total_size", _output_rowset->total_disk_size()) |
197 | 104 | .tag("local_read_time_us", _stats.cloud_local_read_time) |
198 | 104 | .tag("remote_read_time_us", _stats.cloud_remote_read_time) |
199 | 104 | .tag("local_read_bytes", _local_read_bytes_total) |
200 | 104 | .tag("remote_read_bytes", _remote_read_bytes_total); |
201 | | |
202 | 104 | _state = CompactionState::SUCCESS; |
203 | | |
204 | 104 | DorisMetrics::instance()->full_compaction_deltas_total->increment(_input_rowsets.size()); |
205 | 104 | DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_total_size); |
206 | 104 | full_output_size << _output_rowset->total_disk_size(); |
207 | | |
208 | 104 | cloud_tablet()->set_last_full_compaction_success_time(UnixMillis()); |
209 | 104 | cloud_tablet()->set_last_full_compaction_status(Status::OK().to_string()); |
210 | 104 | return Status::OK(); |
211 | 104 | } |
212 | | |
213 | 104 | Status CloudFullCompaction::modify_rowsets() { |
214 | | // commit compaction job |
215 | 104 | cloud::TabletJobInfoPB job; |
216 | 104 | auto idx = job.mutable_idx(); |
217 | 104 | idx->set_tablet_id(_tablet->tablet_id()); |
218 | 104 | idx->set_table_id(_tablet->table_id()); |
219 | 104 | idx->set_index_id(_tablet->index_id()); |
220 | 104 | idx->set_partition_id(_tablet->partition_id()); |
221 | 104 | auto compaction_job = job.add_compaction(); |
222 | 104 | compaction_job->set_id(_uuid); |
223 | 104 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
224 | 104 | std::to_string(config::heartbeat_service_port)); |
225 | 104 | compaction_job->set_type(cloud::TabletCompactionJobPB::FULL); |
226 | 104 | compaction_job->set_input_cumulative_point(cloud_tablet()->cumulative_layer_point()); |
227 | 104 | compaction_job->set_output_cumulative_point(_output_rowset->end_version() + 1); |
228 | 104 | compaction_job->set_num_input_rows(_input_row_num); |
229 | 104 | compaction_job->set_num_output_rows(_output_rowset->num_rows()); |
230 | 104 | compaction_job->set_size_input_rowsets(_input_rowsets_total_size); |
231 | 104 | compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size()); |
232 | 104 | DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size", { |
233 | 104 | compaction_job->set_size_input_rowsets(1); |
234 | 104 | compaction_job->set_size_output_rowsets(10000001); |
235 | 104 | }) |
236 | 104 | compaction_job->set_num_input_segments(_input_segments); |
237 | 104 | compaction_job->set_num_output_segments(_output_rowset->num_segments()); |
238 | 104 | compaction_job->set_num_input_rowsets(num_input_rowsets()); |
239 | 104 | compaction_job->set_num_output_rowsets(1); |
240 | 104 | compaction_job->add_input_versions(_input_rowsets.front()->start_version()); |
241 | 104 | compaction_job->add_input_versions(_input_rowsets.back()->end_version()); |
242 | 104 | compaction_job->add_output_versions(_output_rowset->end_version()); |
243 | 104 | compaction_job->add_txn_id(_output_rowset->txn_id()); |
244 | 104 | compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string()); |
245 | 104 | compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size); |
246 | 104 | compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size); |
247 | 104 | compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size()); |
248 | 104 | compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size()); |
249 | | |
250 | 104 | DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.block", DBUG_BLOCK); |
251 | | |
252 | 104 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
253 | 104 | _tablet->enable_unique_key_merge_on_write()) { |
254 | 5 | RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator())); |
255 | 5 | compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); |
256 | 5 | } |
257 | | |
258 | 104 | cloud::FinishTabletJobResponse resp; |
259 | 104 | auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); |
260 | 104 | if (!st.ok()) { |
261 | 0 | if (resp.status().code() == cloud::TABLET_NOT_FOUND) { |
262 | 0 | cloud_tablet()->clear_cache(); |
263 | 0 | } |
264 | 0 | return st; |
265 | 0 | } |
266 | 104 | auto& stats = resp.stats(); |
267 | 104 | LOG(INFO) << "tablet stats=" << stats.ShortDebugString(); |
268 | | |
269 | 104 | { |
270 | 104 | std::unique_lock wrlock(_tablet->get_header_lock()); |
271 | | // clang-format off |
272 | 104 | cloud_tablet()->set_last_base_compaction_success_time(std::max(cloud_tablet()->last_base_compaction_success_time(), stats.last_base_compaction_time_ms())); |
273 | 104 | cloud_tablet()->set_last_cumu_compaction_success_time(std::max(cloud_tablet()->last_cumu_compaction_success_time(), stats.last_cumu_compaction_time_ms())); |
274 | 104 | cloud_tablet()->set_last_full_compaction_success_time(std::max(cloud_tablet()->last_full_compaction_success_time(), stats.last_full_compaction_time_ms())); |
275 | | // clang-format on |
276 | 104 | if (cloud_tablet()->base_compaction_cnt() >= stats.base_compaction_cnt()) { |
277 | | // This could happen while calling `sync_tablet_rowsets` during `commit_tablet_job` |
278 | 0 | return Status::OK(); |
279 | 0 | } |
280 | | // Try to make output rowset visible immediately in tablet cache, instead of waiting for next synchronization from meta-service. |
281 | 104 | cloud_tablet()->delete_rowsets(_input_rowsets, wrlock); |
282 | 104 | cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock); |
283 | 104 | cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); |
284 | 104 | cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt()); |
285 | 104 | cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point()); |
286 | 104 | if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) { |
287 | 104 | cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
288 | 104 | stats.num_rows(), stats.data_size()); |
289 | 104 | } |
290 | 104 | } |
291 | 0 | _tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset); |
292 | 104 | return Status::OK(); |
293 | 104 | } |
294 | | |
295 | 0 | Status CloudFullCompaction::garbage_collection() { |
296 | | //file_cache_garbage_collection(); |
297 | 0 | cloud::TabletJobInfoPB job; |
298 | 0 | auto idx = job.mutable_idx(); |
299 | 0 | idx->set_tablet_id(_tablet->tablet_id()); |
300 | 0 | idx->set_table_id(_tablet->table_id()); |
301 | 0 | idx->set_index_id(_tablet->index_id()); |
302 | 0 | idx->set_partition_id(_tablet->partition_id()); |
303 | 0 | auto compaction_job = job.add_compaction(); |
304 | 0 | compaction_job->set_id(_uuid); |
305 | 0 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
306 | 0 | std::to_string(config::heartbeat_service_port)); |
307 | 0 | compaction_job->set_type(cloud::TabletCompactionJobPB::FULL); |
308 | 0 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
309 | 0 | _tablet->enable_unique_key_merge_on_write()) { |
310 | 0 | compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); |
311 | 0 | } |
312 | 0 | auto st = _engine.meta_mgr().abort_tablet_job(job); |
313 | 0 | if (!st.ok()) { |
314 | 0 | LOG_WARNING("failed to abort compaction job") |
315 | 0 | .tag("job_id", _uuid) |
316 | 0 | .tag("tablet_id", _tablet->tablet_id()) |
317 | 0 | .error(st); |
318 | 0 | } |
319 | 0 | return st; |
320 | 0 | } |
321 | | |
322 | 5 | void CloudFullCompaction::do_lease() { |
323 | 5 | cloud::TabletJobInfoPB job; |
324 | 5 | auto idx = job.mutable_idx(); |
325 | 5 | idx->set_tablet_id(_tablet->tablet_id()); |
326 | 5 | idx->set_table_id(_tablet->table_id()); |
327 | 5 | idx->set_index_id(_tablet->index_id()); |
328 | 5 | idx->set_partition_id(_tablet->partition_id()); |
329 | 5 | auto compaction_job = job.add_compaction(); |
330 | 5 | compaction_job->set_id(_uuid); |
331 | 5 | using namespace std::chrono; |
332 | 5 | int64_t lease_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count() + |
333 | 5 | config::lease_compaction_interval_seconds * 4; |
334 | 5 | compaction_job->set_lease(lease_time); |
335 | 5 | auto st = _engine.meta_mgr().lease_tablet_job(job); |
336 | 5 | if (!st.ok()) { |
337 | 0 | LOG_WARNING("failed to lease compaction job") |
338 | 0 | .tag("job_id", _uuid) |
339 | 0 | .tag("tablet_id", _tablet->tablet_id()) |
340 | 0 | .error(st); |
341 | 0 | } |
342 | 5 | } |
343 | | |
344 | 5 | Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t initiator) { |
345 | 5 | DCHECK(_output_rowset->start_version() <= 2) << _output_rowset->start_version(); |
346 | 5 | std::vector<RowsetSharedPtr> tmp_rowsets {}; |
347 | 5 | DeleteBitmapPtr delete_bitmap = |
348 | 5 | std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id()); |
349 | 5 | RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(cloud_tablet())); |
350 | 5 | int64_t max_version = cloud_tablet()->max_version().second; |
351 | 5 | DCHECK(max_version >= _output_rowset->version().second); |
352 | 5 | if (max_version > _output_rowset->version().second) { |
353 | 0 | auto ret = DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked( |
354 | 0 | {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {})); |
355 | 0 | tmp_rowsets = std::move(ret.rowsets); |
356 | 0 | } |
357 | 5 | for (const auto& it : tmp_rowsets) { |
358 | 0 | int64_t cur_version = it->rowset_meta()->start_version(); |
359 | 0 | RETURN_IF_ERROR(_cloud_full_compaction_calc_delete_bitmap(it, cur_version, delete_bitmap)); |
360 | 0 | } |
361 | | |
362 | 5 | RETURN_IF_ERROR( |
363 | 5 | _engine.meta_mgr().get_delete_bitmap_update_lock(*cloud_tablet(), -1, initiator)); |
364 | 5 | RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(cloud_tablet())); |
365 | 5 | std::lock_guard header_lock(_tablet->get_header_lock()); |
366 | 62 | for (const auto& it : cloud_tablet()->rowset_map()) { |
367 | 62 | int64_t cur_version = it.first.first; |
368 | 62 | const RowsetSharedPtr& published_rowset = it.second; |
369 | 62 | if (cur_version > max_version) { |
370 | 0 | RETURN_IF_ERROR(_cloud_full_compaction_calc_delete_bitmap(published_rowset, cur_version, |
371 | 0 | delete_bitmap)); |
372 | 0 | } |
373 | 62 | } |
374 | 5 | std::optional<StorageResource> storage_resource; |
375 | 5 | auto storage_resource_result = _output_rowset->rowset_meta()->remote_storage_resource(); |
376 | 5 | if (storage_resource_result) { |
377 | 5 | storage_resource = *storage_resource_result.value(); |
378 | 5 | } |
379 | 5 | RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( |
380 | 5 | *cloud_tablet(), -1, initiator, delete_bitmap.get(), delete_bitmap.get(), |
381 | 5 | _output_rowset->rowset_id().to_string(), storage_resource, |
382 | 5 | config::delete_bitmap_store_write_version)); |
383 | 5 | LOG_INFO("update delete bitmap in CloudFullCompaction, tablet_id={}, range=[{}-{}]", |
384 | 5 | _tablet->tablet_id(), _input_rowsets.front()->start_version(), |
385 | 5 | _input_rowsets.back()->end_version()) |
386 | 5 | .tag("job_id", _uuid) |
387 | 5 | .tag("initiator", initiator) |
388 | 5 | .tag("input_rowsets", _input_rowsets.size()) |
389 | 5 | .tag("input_rows", _input_row_num) |
390 | 5 | .tag("input_segments", _input_segments) |
391 | 5 | .tag("input_rowsets_total_size", _input_rowsets_total_size) |
392 | 5 | .tag("update_bitmap_size", delete_bitmap->delete_bitmap.size()); |
393 | 5 | _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap); |
394 | 5 | return Status::OK(); |
395 | 5 | } |
396 | | |
397 | | Status CloudFullCompaction::_cloud_full_compaction_calc_delete_bitmap( |
398 | | const RowsetSharedPtr& published_rowset, int64_t cur_version, |
399 | 0 | DeleteBitmapPtr delete_bitmap) { |
400 | 0 | std::vector<segment_v2::SegmentSharedPtr> segments; |
401 | 0 | RETURN_IF_ERROR( |
402 | 0 | std::static_pointer_cast<BetaRowset>(published_rowset)->load_segments(&segments)); |
403 | 0 | std::vector<RowsetSharedPtr> specified_rowsets {_output_rowset}; |
404 | 0 | DeleteBitmapPtr tmp_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id()); |
405 | |
|
406 | 0 | OlapStopWatch watch; |
407 | 0 | auto token = _engine.calc_delete_bitmap_executor()->create_token(); |
408 | 0 | RETURN_IF_ERROR(BaseTablet::calc_delete_bitmap( |
409 | 0 | _tablet, published_rowset, segments, specified_rowsets, tmp_delete_bitmap, cur_version, |
410 | 0 | token.get(), _output_rs_writer.get())); |
411 | 0 | RETURN_IF_ERROR(token->wait()); |
412 | 0 | size_t total_rows = std::accumulate( |
413 | 0 | segments.begin(), segments.end(), 0, |
414 | 0 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); |
415 | 0 | for (const auto& [k, v] : tmp_delete_bitmap->delete_bitmap) { |
416 | 0 | if (std::get<0>(k) == _output_rowset->rowset_id() && |
417 | 0 | std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) { |
418 | 0 | delete_bitmap->merge({std::get<0>(k), std::get<1>(k), cur_version}, v); |
419 | 0 | } |
420 | 0 | } |
421 | 0 | VLOG_DEBUG << "[Full compaction] construct delete bitmap tablet: " << _tablet->tablet_id() |
422 | 0 | << ", published rowset version: [" << published_rowset->version().first << "-" |
423 | 0 | << published_rowset->version().second << "]" |
424 | 0 | << ", full compaction rowset version: [" << _output_rowset->version().first << "-" |
425 | 0 | << _output_rowset->version().second << "]" |
426 | 0 | << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; |
427 | 0 | return Status::OK(); |
428 | 0 | } |
429 | | |
430 | | } // namespace doris |