be/src/cloud/cloud_base_compaction.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_base_compaction.h" |
19 | | |
20 | | #include <gen_cpp/cloud.pb.h> |
21 | | |
22 | | #include <boost/container_hash/hash.hpp> |
23 | | |
24 | | #include "cloud/cloud_meta_mgr.h" |
25 | | #include "cloud/config.h" |
26 | | #include "common/config.h" |
27 | | #include "core/value/vdatetime_value.h" |
28 | | #include "cpp/sync_point.h" |
29 | | #include "service/backend_options.h" |
30 | | #include "storage/compaction/compaction.h" |
31 | | #include "storage/task/engine_checksum_task.h" |
32 | | #include "util/thread.h" |
33 | | #include "util/uuid_generator.h" |
34 | | |
35 | | namespace doris { |
36 | | using namespace ErrorCode; |
37 | | |
38 | | bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size"); |
39 | | bvar::Adder<uint64_t> base_input_cached_size("base_compaction", "input_cached_size"); |
40 | | bvar::Adder<uint64_t> base_input_size("base_compaction", "input_size"); |
41 | | bvar::LatencyRecorder g_base_compaction_hold_delete_bitmap_lock_time_ms( |
42 | | "base_compaction_hold_delete_bitmap_lock_time_ms"); |
43 | | |
44 | | CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) |
45 | 3.88k | : CloudCompactionMixin(engine, tablet, |
46 | 3.88k | "BaseCompaction:" + std::to_string(tablet->tablet_id())) {} |
47 | | |
48 | 3.88k | CloudBaseCompaction::~CloudBaseCompaction() = default; |
49 | | |
50 | 3.87k | Status CloudBaseCompaction::prepare_compact() { |
51 | 3.87k | Status st; |
52 | 3.87k | Defer defer_set_st([&] { |
53 | 3.87k | if (!st.ok()) { |
54 | 3.79k | cloud_tablet()->set_last_base_compaction_status(st.to_string()); |
55 | 3.79k | cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); |
56 | 3.79k | } |
57 | 3.87k | }); |
58 | 3.87k | if (_tablet->tablet_state() != TABLET_RUNNING) { |
59 | 0 | st = Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id()); |
60 | 0 | return st; |
61 | 0 | } |
62 | | |
63 | 3.87k | bool need_sync_tablet = true; |
64 | 3.87k | { |
65 | 3.87k | std::shared_lock rlock(_tablet->get_header_lock()); |
66 | | // If number of rowsets is equal to approximate_num_rowsets, it is very likely that this tablet has been |
67 | | // synchronized with meta-service. |
68 | 3.87k | if (_tablet->tablet_meta()->all_rs_metas().size() >= |
69 | 3.87k | cloud_tablet()->fetch_add_approximate_num_rowsets(0) && |
70 | 3.87k | cloud_tablet()->last_sync_time_s > 0) { |
71 | 3.69k | need_sync_tablet = false; |
72 | 3.69k | } |
73 | 3.87k | } |
74 | 3.87k | if (need_sync_tablet) { |
75 | 180 | st = cloud_tablet()->sync_rowsets(); |
76 | 180 | RETURN_IF_ERROR(st); |
77 | 180 | } |
78 | | |
79 | 3.87k | st = pick_rowsets_to_compact(); |
80 | 3.87k | RETURN_IF_ERROR(st); |
81 | | |
82 | 797 | for (auto& rs : _input_rowsets) { |
83 | 797 | _input_row_num += rs->num_rows(); |
84 | 797 | _input_segments += rs->num_segments(); |
85 | 797 | _input_rowsets_data_size += rs->data_disk_size(); |
86 | 797 | _input_rowsets_index_size += rs->index_disk_size(); |
87 | 797 | _input_rowsets_total_size += rs->total_disk_size(); |
88 | 797 | _input_rowsets_cached_data_size += rs->approximate_cached_data_size(); |
89 | 797 | _input_rowsets_cached_index_size += rs->approximate_cache_index_size(); |
90 | 797 | } |
91 | 83 | LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(), |
92 | 83 | _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) |
93 | 83 | .tag("job_id", _uuid) |
94 | 83 | .tag("input_rowsets", _input_rowsets.size()) |
95 | 83 | .tag("input_rows", _input_row_num) |
96 | 83 | .tag("input_segments", _input_segments) |
97 | 83 | .tag("input_rowsets_data_size", _input_rowsets_data_size) |
98 | 83 | .tag("input_rowsets_index_size", _input_rowsets_index_size) |
99 | 83 | .tag("input_rowsets_total_size", _input_rowsets_total_size) |
100 | 83 | .tag("input_rowsets_cached_data_size", _input_rowsets_cached_data_size) |
101 | 83 | .tag("input_rowsets_cached_index_size", _input_rowsets_cached_index_size); |
102 | 83 | base_input_cached_size << (_input_rowsets_cached_data_size + _input_rowsets_cached_index_size); |
103 | 83 | base_input_size << _input_rowsets_total_size; |
104 | 83 | return Status::OK(); |
105 | 3.87k | } |
106 | | |
107 | 83 | Status CloudBaseCompaction::request_global_lock() { |
108 | | // prepare compaction job |
109 | 83 | cloud::TabletJobInfoPB job; |
110 | 83 | auto idx = job.mutable_idx(); |
111 | 83 | idx->set_tablet_id(_tablet->tablet_id()); |
112 | 83 | idx->set_table_id(_tablet->table_id()); |
113 | 83 | idx->set_index_id(_tablet->index_id()); |
114 | 83 | idx->set_partition_id(_tablet->partition_id()); |
115 | 83 | auto compaction_job = job.add_compaction(); |
116 | 83 | compaction_job->set_id(_uuid); |
117 | 83 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
118 | 83 | std::to_string(config::heartbeat_service_port)); |
119 | 83 | compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); |
120 | 83 | compaction_job->set_base_compaction_cnt(_base_compaction_cnt); |
121 | 83 | compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt); |
122 | 83 | compaction_job->add_input_versions(_input_rowsets.front()->start_version()); |
123 | 83 | compaction_job->add_input_versions(_input_rowsets.back()->end_version()); |
124 | 83 | using namespace std::chrono; |
125 | 83 | int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
126 | 83 | _expiration = now + config::compaction_timeout_seconds; |
127 | 83 | compaction_job->set_expiration(_expiration); |
128 | 83 | compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4); |
129 | 83 | cloud::StartTabletJobResponse resp; |
130 | 83 | auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); |
131 | 83 | cloud_tablet()->set_last_base_compaction_status(st.to_string()); |
132 | 83 | if (resp.has_alter_version()) { |
133 | 0 | (static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version()); |
134 | 0 | } |
135 | 83 | if (!st.ok()) { |
136 | 6 | cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); |
137 | 6 | if (resp.status().code() == cloud::STALE_TABLET_CACHE) { |
138 | | // set last_sync_time to 0 to force sync tablet next time |
139 | 0 | cloud_tablet()->last_sync_time_s = 0; |
140 | 6 | } else if (resp.status().code() == cloud::TABLET_NOT_FOUND) { |
141 | | // tablet not found |
142 | 6 | cloud_tablet()->clear_cache(); |
143 | 6 | } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { |
144 | 0 | auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get())); |
145 | 0 | std::stringstream ss; |
146 | 0 | ss << "failed to prepare cumu compaction. Check compaction input versions " |
147 | 0 | "failed in schema change. The input version end must " |
148 | 0 | "less than or equal to alter_version." |
149 | 0 | "current alter version in BE is not correct." |
150 | 0 | "input_version_start=" |
151 | 0 | << compaction_job->input_versions(0) |
152 | 0 | << " input_version_end=" << compaction_job->input_versions(1) |
153 | 0 | << " current alter_version=" << cloud_tablet->alter_version() |
154 | 0 | << " schema_change_alter_version=" << resp.alter_version(); |
155 | 0 | std::string msg = ss.str(); |
156 | 0 | LOG(WARNING) << msg; |
157 | 0 | return Status::InternalError(msg); |
158 | 0 | } |
159 | 6 | } |
160 | 83 | return st; |
161 | 83 | } |
162 | | |
163 | 3.87k | void CloudBaseCompaction::_filter_input_rowset() { |
164 | | // if dup_key and no delete predicate |
165 | | // we skip big files to save resources |
166 | 3.87k | if (_tablet->keys_type() != KeysType::DUP_KEYS) { |
167 | 1.50k | return; |
168 | 1.50k | } |
169 | 2.47k | for (auto& rs : _input_rowsets) { |
170 | 2.47k | if (rs->rowset_meta()->has_delete_predicate()) { |
171 | 1.01k | return; |
172 | 1.01k | } |
173 | 2.47k | } |
174 | 1.35k | int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 1024 * 1024; |
175 | | // first find a proper rowset for start |
176 | 1.35k | auto rs_iter = _input_rowsets.begin(); |
177 | 1.35k | while (rs_iter != _input_rowsets.end()) { |
178 | 439 | if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) { |
179 | 0 | rs_iter = _input_rowsets.erase(rs_iter); |
180 | 439 | } else { |
181 | 439 | break; |
182 | 439 | } |
183 | 439 | } |
184 | 1.35k | } |
185 | | |
186 | 3.87k | Status CloudBaseCompaction::pick_rowsets_to_compact() { |
187 | 3.87k | _input_rowsets.clear(); |
188 | 3.87k | { |
189 | 3.87k | std::shared_lock rlock(_tablet->get_header_lock()); |
190 | 3.87k | _base_compaction_cnt = cloud_tablet()->base_compaction_cnt(); |
191 | 3.87k | _cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt(); |
192 | 3.87k | _input_rowsets = cloud_tablet()->pick_candidate_rowsets_to_base_compaction(); |
193 | 3.87k | } |
194 | 3.87k | if (auto st = check_version_continuity(_input_rowsets); !st.ok()) { |
195 | 0 | DCHECK(false) << st; |
196 | 0 | return st; |
197 | 0 | } |
198 | 3.87k | _filter_input_rowset(); |
199 | 3.87k | if (_input_rowsets.size() <= 1) { |
200 | 1.73k | return Status::Error<BE_NO_SUITABLE_VERSION>( |
201 | 1.73k | "insufficent compaction input rowset, #rowsets={}", _input_rowsets.size()); |
202 | 1.73k | } |
203 | | |
204 | 2.14k | if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { |
205 | | // the tablet is with rowset: [0-1], [2-y] |
206 | | // and [0-1] has no data. in this situation, no need to do base compaction. |
207 | 0 | return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); |
208 | 0 | } |
209 | | |
210 | 2.14k | int score = 0; |
211 | 2.14k | int rowset_cnt = 0; |
212 | 2.14k | int64_t max_compaction_score = _tablet->keys_type() == KeysType::UNIQUE_KEYS && |
213 | 2.14k | _tablet->enable_unique_key_merge_on_write() |
214 | 2.14k | ? config::mow_base_compaction_max_compaction_score |
215 | 2.14k | : config::base_compaction_max_compaction_score; |
216 | 12.5k | while (rowset_cnt < _input_rowsets.size()) { |
217 | 10.4k | score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); |
218 | 10.4k | if (score > max_compaction_score) { |
219 | 13 | break; |
220 | 13 | } |
221 | 10.4k | } |
222 | 2.14k | _input_rowsets.resize(rowset_cnt); |
223 | | |
224 | | // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold |
225 | 2.14k | if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { |
226 | 54 | VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() |
227 | 0 | << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 |
228 | 0 | << ", base_compaction_num_cumulative_rowsets=" |
229 | 0 | << config::base_compaction_min_rowset_num; |
230 | 54 | apply_txn_size_truncation_and_log("CloudBaseCompaction"); |
231 | 54 | return Status::OK(); |
232 | 54 | } |
233 | | |
234 | | // 2. the ratio between base rowset and all input cumulative rowsets reaches the threshold |
235 | | // `_input_rowsets` has been sorted by end version, so we consider `_input_rowsets[0]` is the base rowset. |
236 | 2.09k | int64_t base_size = _input_rowsets.front()->data_disk_size(); |
237 | 2.09k | int64_t cumulative_total_size = 0; |
238 | 9.75k | for (auto it = _input_rowsets.begin() + 1; it != _input_rowsets.end(); ++it) { |
239 | 7.66k | cumulative_total_size += (*it)->data_disk_size(); |
240 | 7.66k | } |
241 | | |
242 | 2.09k | double base_cumulative_delta_ratio = config::base_compaction_min_data_ratio; |
243 | 2.09k | if (base_size == 0) { |
244 | | // base_size == 0 means this may be a base version [0-1], which has no data. |
245 | | // set to 1 to void divide by zero |
246 | 1.64k | base_size = 1; |
247 | 1.64k | } |
248 | 2.09k | double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size; |
249 | | |
250 | 2.09k | if (cumulative_base_ratio > base_cumulative_delta_ratio) { |
251 | 29 | VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() |
252 | 0 | << ", cumulative_total_size=" << cumulative_total_size |
253 | 0 | << ", base_size=" << base_size |
254 | 0 | << ", cumulative_base_ratio=" << cumulative_base_ratio |
255 | 0 | << ", policy_ratio=" << base_cumulative_delta_ratio; |
256 | 29 | apply_txn_size_truncation_and_log("CloudBaseCompaction"); |
257 | 29 | return Status::OK(); |
258 | 29 | } |
259 | | |
260 | | // 3. the interval since last base compaction reaches the threshold |
261 | 2.06k | int64_t base_creation_time = _input_rowsets[0]->creation_time(); |
262 | 2.06k | int64_t interval_threshold = config::base_compaction_interval_seconds_since_last_operation; |
263 | 2.06k | int64_t interval_since_last_base_compaction = time(nullptr) - base_creation_time; |
264 | 2.06k | if (interval_since_last_base_compaction > interval_threshold) { |
265 | 0 | VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() |
266 | 0 | << ", interval_since_last_base_compaction=" |
267 | 0 | << interval_since_last_base_compaction |
268 | 0 | << ", interval_threshold=" << interval_threshold; |
269 | 0 | apply_txn_size_truncation_and_log("CloudBaseCompaction"); |
270 | 0 | return Status::OK(); |
271 | 0 | } |
272 | | |
273 | 2.06k | VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" << _tablet->tablet_id() |
274 | 0 | << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1 |
275 | 0 | << ", cumulative_base_ratio=" << cumulative_base_ratio |
276 | 0 | << ", interval_since_last_base_compaction=" << interval_since_last_base_compaction; |
277 | 2.06k | return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); |
278 | 2.06k | } |
279 | | |
280 | 77 | Status CloudBaseCompaction::execute_compact() { |
281 | 77 | #ifndef __APPLE__ |
282 | 77 | if (config::enable_base_compaction_idle_sched) { |
283 | 77 | Thread::set_idle_sched(); |
284 | 77 | } |
285 | 77 | #endif |
286 | | |
287 | 77 | SCOPED_ATTACH_TASK(_mem_tracker); |
288 | | |
289 | 77 | using namespace std::chrono; |
290 | 77 | auto start = steady_clock::now(); |
291 | 77 | Status st; |
292 | 77 | Defer defer_set_st([&] { |
293 | 77 | cloud_tablet()->set_last_base_compaction_status(st.to_string()); |
294 | 77 | if (!st.ok()) { |
295 | 0 | cloud_tablet()->set_last_base_compaction_failure_time(UnixMillis()); |
296 | 77 | } else { |
297 | 77 | cloud_tablet()->set_last_base_compaction_success_time(UnixMillis()); |
298 | 77 | } |
299 | 77 | }); |
300 | 77 | st = CloudCompactionMixin::execute_compact(); |
301 | 77 | if (!st.ok()) { |
302 | 0 | LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << st |
303 | 0 | << ", tablet=" << _tablet->tablet_id() |
304 | 0 | << ", output_version=" << _output_version; |
305 | 0 | return st; |
306 | 0 | } |
307 | 77 | LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]", |
308 | 77 | _tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(), |
309 | 77 | _input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()) |
310 | 77 | .tag("job_id", _uuid) |
311 | 77 | .tag("input_rowsets", _input_rowsets.size()) |
312 | 77 | .tag("input_rows", _input_row_num) |
313 | 77 | .tag("input_segments", _input_segments) |
314 | 77 | .tag("input_rowsets_data_size", _input_rowsets_data_size) |
315 | 77 | .tag("input_rowsets_index_size", _input_rowsets_index_size) |
316 | 77 | .tag("input_rowsets_total", _input_rowsets_total_size) |
317 | 77 | .tag("output_rows", _output_rowset->num_rows()) |
318 | 77 | .tag("output_segments", _output_rowset->num_segments()) |
319 | 77 | .tag("output_rowset_data_size", _output_rowset->data_disk_size()) |
320 | 77 | .tag("output_rowset_index_size", _output_rowset->index_disk_size()) |
321 | 77 | .tag("output_rowset_total_size", _output_rowset->total_disk_size()) |
322 | 77 | .tag("local_read_time_us", _stats.cloud_local_read_time) |
323 | 77 | .tag("remote_read_time_us", _stats.cloud_remote_read_time) |
324 | 77 | .tag("local_read_bytes", _local_read_bytes_total) |
325 | 77 | .tag("remote_read_bytes", _remote_read_bytes_total); |
326 | | |
327 | | //_compaction_succeed = true; |
328 | 77 | _state = CompactionState::SUCCESS; |
329 | | |
330 | 77 | DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); |
331 | 77 | DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size); |
332 | 77 | base_output_size << _output_rowset->total_disk_size(); |
333 | | |
334 | 77 | st = Status::OK(); |
335 | 77 | return st; |
336 | 77 | } |
337 | | |
338 | 77 | Status CloudBaseCompaction::modify_rowsets() { |
339 | | // commit compaction job |
340 | 77 | cloud::TabletJobInfoPB job; |
341 | 77 | auto idx = job.mutable_idx(); |
342 | 77 | idx->set_tablet_id(_tablet->tablet_id()); |
343 | 77 | idx->set_table_id(_tablet->table_id()); |
344 | 77 | idx->set_index_id(_tablet->index_id()); |
345 | 77 | idx->set_partition_id(_tablet->partition_id()); |
346 | 77 | auto compaction_job = job.add_compaction(); |
347 | 77 | compaction_job->set_id(_uuid); |
348 | 77 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
349 | 77 | std::to_string(config::heartbeat_service_port)); |
350 | 77 | compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); |
351 | 77 | compaction_job->set_input_cumulative_point(cloud_tablet()->cumulative_layer_point()); |
352 | 77 | compaction_job->set_output_cumulative_point(cloud_tablet()->cumulative_layer_point()); |
353 | 77 | compaction_job->set_num_input_rows(_input_row_num); |
354 | 77 | compaction_job->set_num_output_rows(_output_rowset->num_rows()); |
355 | 77 | compaction_job->set_size_input_rowsets(_input_rowsets_total_size); |
356 | 77 | compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size()); |
357 | 77 | compaction_job->set_num_input_segments(_input_segments); |
358 | 77 | compaction_job->set_num_output_segments(_output_rowset->num_segments()); |
359 | 77 | compaction_job->set_num_input_rowsets(num_input_rowsets()); |
360 | 77 | compaction_job->set_num_output_rowsets(1); |
361 | 77 | compaction_job->add_input_versions(_input_rowsets.front()->start_version()); |
362 | 77 | compaction_job->add_input_versions(_input_rowsets.back()->end_version()); |
363 | 77 | compaction_job->add_output_versions(_output_rowset->end_version()); |
364 | 77 | compaction_job->add_txn_id(_output_rowset->txn_id()); |
365 | 77 | compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string()); |
366 | 77 | compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size); |
367 | 77 | compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size); |
368 | 77 | compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size()); |
369 | 77 | compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size()); |
370 | | |
371 | 77 | DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; |
372 | 77 | int64_t get_delete_bitmap_lock_start_time = 0; |
373 | 77 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
374 | 77 | _tablet->enable_unique_key_merge_on_write()) { |
375 | 16 | int64_t initiator = this->initiator(); |
376 | 16 | RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( |
377 | 16 | _input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(), |
378 | 16 | _stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap, |
379 | 16 | _allow_delete_in_cumu_compaction, get_delete_bitmap_lock_start_time)); |
380 | 16 | LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]", |
381 | 16 | _tablet->tablet_id(), _input_rowsets.front()->start_version(), |
382 | 16 | _input_rowsets.back()->end_version()) |
383 | 16 | .tag("job_id", _uuid) |
384 | 16 | .tag("initiator", initiator) |
385 | 16 | .tag("input_rowsets", _input_rowsets.size()) |
386 | 16 | .tag("input_rows", _input_row_num) |
387 | 16 | .tag("input_segments", _input_segments) |
388 | 16 | .tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size()); |
389 | 16 | compaction_job->set_delete_bitmap_lock_initiator(initiator); |
390 | 16 | } |
391 | | |
392 | 77 | cloud::FinishTabletJobResponse resp; |
393 | 77 | auto st = _engine.meta_mgr().commit_tablet_job(job, &resp); |
394 | 77 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
395 | 77 | _tablet->enable_unique_key_merge_on_write()) { |
396 | 16 | int64_t hold_delete_bitmap_lock_time_ms = |
397 | 16 | (MonotonicMicros() - get_delete_bitmap_lock_start_time) / 1000; |
398 | 16 | g_base_compaction_hold_delete_bitmap_lock_time_ms << hold_delete_bitmap_lock_time_ms; |
399 | 16 | } |
400 | 77 | if (!st.ok()) { |
401 | 0 | if (resp.status().code() == cloud::TABLET_NOT_FOUND) { |
402 | 0 | cloud_tablet()->clear_cache(); |
403 | 0 | } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) { |
404 | 0 | auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get())); |
405 | 0 | std::stringstream ss; |
406 | 0 | ss << "failed to prepare cumu compaction. Check compaction input versions " |
407 | 0 | "failed in schema change. The input version end must " |
408 | 0 | "less than or equal to alter_version." |
409 | 0 | "current alter version in BE is not correct." |
410 | 0 | "input_version_start=" |
411 | 0 | << compaction_job->input_versions(0) |
412 | 0 | << " input_version_end=" << compaction_job->input_versions(1) |
413 | 0 | << " current alter_version=" << cloud_tablet->alter_version() |
414 | 0 | << " schema_change_alter_version=" << resp.alter_version(); |
415 | 0 | std::string msg = ss.str(); |
416 | 0 | LOG(WARNING) << msg; |
417 | 0 | cloud_tablet->set_alter_version(resp.alter_version()); |
418 | 0 | return Status::InternalError(msg); |
419 | 0 | } |
420 | 0 | return st; |
421 | 0 | } |
422 | 77 | auto& stats = resp.stats(); |
423 | 77 | LOG(INFO) << "tablet stats=" << stats.ShortDebugString(); |
424 | | |
425 | 77 | { |
426 | 77 | std::unique_lock wrlock(_tablet->get_header_lock()); |
427 | | // clang-format off |
428 | 77 | cloud_tablet()->set_last_base_compaction_success_time(std::max(cloud_tablet()->last_base_compaction_success_time(), stats.last_base_compaction_time_ms())); |
429 | 77 | cloud_tablet()->set_last_cumu_compaction_success_time(std::max(cloud_tablet()->last_cumu_compaction_success_time(), stats.last_cumu_compaction_time_ms())); |
430 | 77 | cloud_tablet()->set_last_full_compaction_success_time(std::max(cloud_tablet()->last_full_compaction_success_time(), stats.last_full_compaction_time_ms())); |
431 | | // clang-format on |
432 | 77 | if (cloud_tablet()->base_compaction_cnt() >= stats.base_compaction_cnt()) { |
433 | | // This could happen while calling `sync_tablet_rowsets` during `commit_tablet_job` |
434 | 0 | return Status::OK(); |
435 | 0 | } |
436 | | // Try to make output rowset visible immediately in tablet cache, instead of waiting for next synchronization from meta-service. |
437 | 77 | cloud_tablet()->delete_rowsets(_input_rowsets, wrlock); |
438 | 77 | cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock); |
439 | | // ATTN: MUST NOT update `cumu_compaction_cnt` or `cumu_point` which are used when sync rowsets, otherwise may cause |
440 | | // the tablet to be unable to synchronize the rowset meta changes generated by cumu compaction. |
441 | 77 | cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); |
442 | 77 | if (output_rowset_delete_bitmap) { |
443 | 16 | _tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap); |
444 | 16 | } |
445 | 77 | if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) { |
446 | 77 | cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
447 | 77 | stats.num_rows(), stats.data_size()); |
448 | 77 | } |
449 | 77 | } |
450 | 0 | _tablet->prefill_dbm_agg_cache_after_compaction(_output_rowset); |
451 | 77 | return Status::OK(); |
452 | 77 | } |
453 | | |
454 | 0 | Status CloudBaseCompaction::garbage_collection() { |
455 | 0 | RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection()); |
456 | 0 | cloud::TabletJobInfoPB job; |
457 | 0 | auto idx = job.mutable_idx(); |
458 | 0 | idx->set_tablet_id(_tablet->tablet_id()); |
459 | 0 | idx->set_table_id(_tablet->table_id()); |
460 | 0 | idx->set_index_id(_tablet->index_id()); |
461 | 0 | idx->set_partition_id(_tablet->partition_id()); |
462 | 0 | auto compaction_job = job.add_compaction(); |
463 | 0 | compaction_job->set_id(_uuid); |
464 | 0 | compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + |
465 | 0 | std::to_string(config::heartbeat_service_port)); |
466 | 0 | compaction_job->set_type(cloud::TabletCompactionJobPB::BASE); |
467 | 0 | if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
468 | 0 | _tablet->enable_unique_key_merge_on_write()) { |
469 | 0 | compaction_job->set_delete_bitmap_lock_initiator(this->initiator()); |
470 | 0 | } |
471 | 0 | auto st = _engine.meta_mgr().abort_tablet_job(job); |
472 | 0 | if (!st.ok()) { |
473 | 0 | LOG_WARNING("failed to abort compaction job") |
474 | 0 | .tag("job_id", _uuid) |
475 | 0 | .tag("tablet_id", _tablet->tablet_id()) |
476 | 0 | .error(st); |
477 | 0 | } |
478 | 0 | return st; |
479 | 0 | } |
480 | | |
481 | 0 | void CloudBaseCompaction::do_lease() { |
482 | 0 | cloud::TabletJobInfoPB job; |
483 | 0 | if (_state == CompactionState::SUCCESS) { |
484 | 0 | return; |
485 | 0 | } |
486 | 0 | auto idx = job.mutable_idx(); |
487 | 0 | idx->set_tablet_id(_tablet->tablet_id()); |
488 | 0 | idx->set_table_id(_tablet->table_id()); |
489 | 0 | idx->set_index_id(_tablet->index_id()); |
490 | 0 | idx->set_partition_id(_tablet->partition_id()); |
491 | 0 | auto compaction_job = job.add_compaction(); |
492 | 0 | compaction_job->set_id(_uuid); |
493 | 0 | using namespace std::chrono; |
494 | 0 | int64_t lease_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count() + |
495 | 0 | config::lease_compaction_interval_seconds * 4; |
496 | 0 | compaction_job->set_lease(lease_time); |
497 | 0 | auto st = _engine.meta_mgr().lease_tablet_job(job); |
498 | 0 | if (!st.ok()) { |
499 | 0 | LOG_WARNING("failed to lease compaction job") |
500 | 0 | .tag("job_id", _uuid) |
501 | 0 | .tag("tablet_id", _tablet->tablet_id()) |
502 | 0 | .error(st); |
503 | 0 | } |
504 | 0 | } |
505 | | |
506 | | } // namespace doris |