be/src/cloud/cloud_rowset_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_rowset_builder.h" |
19 | | |
20 | | #include "cloud/cloud_meta_mgr.h" |
21 | | #include "cloud/cloud_storage_engine.h" |
22 | | #include "cloud/cloud_tablet.h" |
23 | | #include "cloud/cloud_tablet_mgr.h" |
24 | | #include "storage/storage_policy.h" |
25 | | |
26 | | namespace doris { |
27 | | #include "common/compile_check_begin.h" |
28 | | using namespace ErrorCode; |
29 | | |
30 | | CloudRowsetBuilder::CloudRowsetBuilder(CloudStorageEngine& engine, const WriteRequest& req, |
31 | | RuntimeProfile* profile) |
32 | 270k | : BaseRowsetBuilder(req, profile), _engine(engine) {} |
33 | | |
34 | 270k | CloudRowsetBuilder::~CloudRowsetBuilder() { |
35 | | // Clear file cache immediately when load fails |
36 | 270k | if (_is_init && _rowset != nullptr && _rowset->rowset_meta()->rowset_state() == PREPARED) { |
37 | 0 | _rowset->clear_cache(); |
38 | 0 | } |
39 | 270k | } |
40 | | |
41 | 179k | Status CloudRowsetBuilder::init() { |
42 | 179k | _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id)); |
43 | | |
44 | 179k | std::shared_ptr<MowContext> mow_context; |
45 | 179k | if (_tablet->enable_unique_key_merge_on_write()) { |
46 | 55.4k | if (config::cloud_mow_sync_rowsets_when_load_txn_begin) { |
47 | 0 | auto st = std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(); |
48 | | // sync_rowsets will return INVALID_TABLET_STATE when tablet is under alter |
49 | 0 | if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) { |
50 | 0 | return st; |
51 | 0 | } |
52 | 0 | } |
53 | 55.4k | RETURN_IF_ERROR(init_mow_context(mow_context)); |
54 | 55.4k | } |
55 | 179k | RETURN_IF_ERROR(check_tablet_version_count()); |
56 | | |
57 | 179k | using namespace std::chrono; |
58 | 179k | std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms = |
59 | 179k | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
60 | | |
61 | | // build tablet schema in request level |
62 | 179k | RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), |
63 | 179k | *_tablet->tablet_schema())); |
64 | | |
65 | 179k | RowsetWriterContext context; |
66 | 179k | context.txn_id = _req.txn_id; |
67 | 179k | context.txn_expiration = _req.txn_expiration; |
68 | 179k | context.load_id = _req.load_id; |
69 | 179k | context.rowset_state = PREPARED; |
70 | 179k | context.segments_overlap = OVERLAPPING; |
71 | 179k | context.tablet_schema = _tablet_schema; |
72 | 179k | context.newest_write_timestamp = UnixSeconds(); |
73 | 179k | context.tablet_id = _req.tablet_id; |
74 | 179k | context.index_id = _req.index_id; |
75 | 179k | context.tablet = _tablet; |
76 | 179k | context.write_type = DataWriteType::TYPE_DIRECT; |
77 | 179k | context.mow_context = mow_context; |
78 | 179k | context.write_file_cache = _req.write_file_cache; |
79 | 179k | context.partial_update_info = _partial_update_info; |
80 | 179k | context.file_cache_ttl_sec = _tablet->ttl_seconds(); |
81 | 179k | context.storage_resource = _engine.get_storage_resource(_req.storage_vault_id); |
82 | 179k | if (!context.storage_resource) { |
83 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
84 | 0 | _req.storage_vault_id); |
85 | 0 | } |
86 | | |
87 | 179k | _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); |
88 | | |
89 | 179k | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); |
90 | | |
91 | 179k | if (!_skip_writing_rowset_metadata) { |
92 | 54.6k | RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); |
93 | 54.6k | } |
94 | | |
95 | 179k | _is_init = true; |
96 | 179k | return Status::OK(); |
97 | 179k | } |
98 | | |
99 | 178k | Status CloudRowsetBuilder::check_tablet_version_count() { |
100 | 178k | int64_t version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0); |
101 | | // TODO(plat1ko): load backoff algorithm |
102 | 178k | int32_t max_version_config = cloud_tablet()->max_version_config(); |
103 | 178k | if (version_count > max_version_config) { |
104 | 0 | return Status::Error<TOO_MANY_VERSION>( |
105 | 0 | "failed to init rowset builder. version count: {}, exceed limit: {}, " |
106 | 0 | "tablet: {}. Please reduce the frequency of loading data or adjust the " |
107 | 0 | "max_tablet_version_num or time_series_max_tablet_version_numin be.conf to a " |
108 | 0 | "larger value.", |
109 | 0 | version_count, max_version_config, _tablet->tablet_id()); |
110 | 0 | } |
111 | 178k | return Status::OK(); |
112 | 178k | } |
113 | | |
114 | 179k | void CloudRowsetBuilder::update_tablet_stats() { |
115 | 179k | auto* tablet = cloud_tablet(); |
116 | 179k | DCHECK(tablet); |
117 | 179k | DCHECK(_rowset); |
118 | 179k | tablet->fetch_add_approximate_num_rowsets(1); |
119 | 179k | tablet->fetch_add_approximate_num_segments(_rowset->num_segments()); |
120 | 179k | tablet->fetch_add_approximate_num_rows(_rowset->num_rows()); |
121 | 179k | tablet->fetch_add_approximate_data_size(_rowset->total_disk_size()); |
122 | 179k | tablet->fetch_add_approximate_cumu_num_rowsets(1); |
123 | 179k | tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments()); |
124 | 179k | tablet->write_count.fetch_add(1, std::memory_order_relaxed); |
125 | 179k | } |
126 | | |
127 | 537k | CloudTablet* CloudRowsetBuilder::cloud_tablet() { |
128 | 537k | return static_cast<CloudTablet*>(_tablet.get()); |
129 | 537k | } |
130 | | |
131 | 86.2k | const RowsetMetaSharedPtr& CloudRowsetBuilder::rowset_meta() { |
132 | 86.2k | return _rowset_writer->rowset_meta(); |
133 | 86.2k | } |
134 | | |
135 | 179k | Status CloudRowsetBuilder::set_txn_related_info() { |
136 | 179k | if (_tablet->enable_unique_key_merge_on_write()) { |
137 | | // For empty rowsets when skip_writing_empty_rowset_metadata=true, |
138 | | // store only a lightweight marker instead of full rowset info. |
139 | | // This allows CalcDeleteBitmapTask to detect and skip gracefully, |
140 | | // while using minimal memory (~16 bytes per entry). |
141 | 55.5k | if (_skip_writing_rowset_metadata) { |
142 | 32.6k | _engine.txn_delete_bitmap_cache().mark_empty_rowset(_req.txn_id, _tablet->tablet_id(), |
143 | 32.6k | _req.txn_expiration); |
144 | 32.6k | return Status::OK(); |
145 | 32.6k | } |
146 | 22.8k | if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) { |
147 | 22.8k | auto st = _tablet->check_delete_bitmap_correctness( |
148 | 22.8k | _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, *_rowset_ids); |
149 | 22.8k | if (!st.ok()) { |
150 | 0 | LOG(WARNING) << fmt::format( |
151 | 0 | "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " |
152 | 0 | "delete bitmap correctness check failed in commit phase!", |
153 | 0 | _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), |
154 | 0 | _req.partition_id); |
155 | 0 | return st; |
156 | 0 | } |
157 | 22.8k | } |
158 | 22.8k | _engine.txn_delete_bitmap_cache().set_tablet_txn_info( |
159 | 22.8k | _req.txn_id, _tablet->tablet_id(), _delete_bitmap, *_rowset_ids, _rowset, |
160 | 22.8k | _req.txn_expiration, _partial_update_info); |
161 | 123k | } else { |
162 | 123k | if (config::enable_cloud_make_rs_visible_on_be) { |
163 | 123k | if (_skip_writing_rowset_metadata) { |
164 | 92.1k | _engine.committed_rs_mgr().mark_empty_rowset(_req.txn_id, _tablet->tablet_id(), |
165 | 92.1k | _req.txn_expiration); |
166 | 92.1k | } else { |
167 | 31.7k | _engine.meta_mgr().cache_committed_rowset(rowset_meta(), _req.txn_expiration); |
168 | 31.7k | } |
169 | 123k | } |
170 | 123k | } |
171 | 146k | return Status::OK(); |
172 | 179k | } |
173 | | #include "common/compile_check_end.h" |
174 | | } // namespace doris |