be/src/storage/compaction/cold_data_compaction.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/compaction/cold_data_compaction.h" |
19 | | |
20 | | #include <stdint.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <memory> |
24 | | #include <mutex> |
25 | | #include <shared_mutex> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/compiler_util.h" // IWYU pragma: keep |
30 | | #include "common/config.h" |
31 | | #include "common/status.h" |
32 | | #include "io/fs/remote_file_system.h" |
33 | | #include "runtime/thread_context.h" |
34 | | #include "storage/compaction/compaction.h" |
35 | | #include "storage/olap_common.h" |
36 | | #include "storage/rowset/rowset.h" |
37 | | #include "storage/rowset/rowset_writer_context.h" |
38 | | #include "storage/storage_policy.h" |
39 | | #include "storage/tablet/tablet.h" |
40 | | #include "storage/tablet/tablet_meta.h" |
41 | | #include "util/thread.h" |
42 | | #include "util/trace.h" |
43 | | #include "util/uid_util.h" |
44 | | |
45 | | namespace doris { |
46 | | using namespace ErrorCode; |
47 | | |
48 | | ColdDataCompaction::ColdDataCompaction(StorageEngine& engine, const TabletSharedPtr& tablet) |
49 | 0 | : CompactionMixin(engine, tablet, |
50 | 0 | "ColdDataCompaction:" + std::to_string(tablet->tablet_id())) {} |
51 | | |
52 | 0 | ColdDataCompaction::~ColdDataCompaction() = default; |
53 | | |
54 | 0 | Status ColdDataCompaction::prepare_compact() { |
55 | 0 | if (UNLIKELY(!tablet()->init_succeeded())) { |
56 | 0 | return Status::Error<INVALID_ARGUMENT>("_tablet init failed"); |
57 | 0 | } |
58 | 0 | return pick_rowsets_to_compact(); |
59 | 0 | } |
60 | | |
61 | 0 | Status ColdDataCompaction::execute_compact() { |
62 | 0 | #ifndef __APPLE__ |
63 | 0 | if (config::enable_base_compaction_idle_sched) { |
64 | 0 | Thread::set_idle_sched(); |
65 | 0 | } |
66 | 0 | #endif |
67 | 0 | SCOPED_ATTACH_TASK(_mem_tracker); |
68 | 0 | std::shared_lock cooldown_conf_rlock(tablet()->get_cooldown_conf_lock()); |
69 | 0 | if (tablet()->cooldown_conf_unlocked().cooldown_replica_id != tablet()->replica_id()) { |
70 | 0 | return Status::Aborted<false>("this replica is not cooldown replica"); |
71 | 0 | } |
72 | 0 | RETURN_IF_ERROR(CompactionMixin::execute_compact()); |
73 | 0 | DCHECK_EQ(_state, CompactionState::SUCCESS); |
74 | 0 | return Status::OK(); |
75 | 0 | } |
76 | | |
77 | 0 | Status ColdDataCompaction::construct_output_rowset_writer(RowsetWriterContext& ctx) { |
78 | | // write output rowset to storage policy resource |
79 | 0 | auto storage_resource = |
80 | 0 | DORIS_TRY(get_resource_by_storage_policy_id(tablet()->storage_policy_id())); |
81 | 0 | ctx.storage_resource = std::move(storage_resource); |
82 | 0 | return CompactionMixin::construct_output_rowset_writer(ctx); |
83 | 0 | } |
84 | | |
85 | 0 | Status ColdDataCompaction::pick_rowsets_to_compact() { |
86 | 0 | tablet()->traverse_rowsets([this](const auto& rs) { |
87 | 0 | if (!rs->is_local()) { |
88 | 0 | _input_rowsets.push_back(rs); |
89 | 0 | } |
90 | 0 | }); |
91 | 0 | std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); |
92 | 0 | return check_version_continuity(_input_rowsets); |
93 | 0 | } |
94 | | |
95 | 0 | Status ColdDataCompaction::modify_rowsets() { |
96 | 0 | UniqueId cooldown_meta_id = UniqueId::gen_uid(); |
97 | 0 | { |
98 | 0 | std::lock_guard wlock(_tablet->get_header_lock()); |
99 | 0 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
100 | | // Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`. |
101 | 0 | RETURN_IF_ERROR(tablet()->delete_rowsets(_input_rowsets, false)); |
102 | 0 | tablet()->add_rowsets({_output_rowset}); |
103 | | // TODO(plat1ko): process primary key |
104 | 0 | _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id); |
105 | 0 | } |
106 | 0 | { |
107 | 0 | std::shared_lock rlock(_tablet->get_header_lock()); |
108 | 0 | tablet()->save_meta(); |
109 | 0 | } |
110 | | // write remote tablet meta |
111 | 0 | Tablet::async_write_cooldown_meta(std::static_pointer_cast<Tablet>(_tablet)); |
112 | 0 | return Status::OK(); |
113 | 0 | } |
114 | | |
115 | | } // namespace doris |