be/src/cloud/cloud_rowset_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_rowset_builder.h" |
19 | | |
20 | | #include "cloud/cloud_meta_mgr.h" |
21 | | #include "cloud/cloud_storage_engine.h" |
22 | | #include "cloud/cloud_tablet.h" |
23 | | #include "cloud/cloud_tablet_mgr.h" |
24 | | #include "storage/storage_policy.h" |
25 | | |
26 | | namespace doris { |
27 | | #include "common/compile_check_begin.h" |
28 | | using namespace ErrorCode; |
29 | | |
30 | | CloudRowsetBuilder::CloudRowsetBuilder(CloudStorageEngine& engine, const WriteRequest& req, |
31 | | RuntimeProfile* profile) |
32 | 273k | : BaseRowsetBuilder(req, profile), _engine(engine) {} |
33 | | |
34 | 273k | CloudRowsetBuilder::~CloudRowsetBuilder() { |
35 | | // Clear file cache immediately when load fails |
36 | 273k | if (_is_init && _rowset != nullptr && _rowset->rowset_meta()->rowset_state() == PREPARED) { |
37 | 0 | _rowset->clear_cache(); |
38 | 0 | } |
39 | 273k | } |
40 | | |
41 | 180k | Status CloudRowsetBuilder::init() { |
42 | 180k | _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id)); |
43 | | |
44 | 180k | std::shared_ptr<MowContext> mow_context; |
45 | 180k | if (_tablet->enable_unique_key_merge_on_write()) { |
46 | 55.4k | auto st = std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(); |
47 | | // sync_rowsets will return INVALID_TABLET_STATE when tablet is under alter |
48 | 55.4k | if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) { |
49 | 0 | return st; |
50 | 0 | } |
51 | 55.4k | RETURN_IF_ERROR(init_mow_context(mow_context)); |
52 | 55.4k | } |
53 | 180k | RETURN_IF_ERROR(check_tablet_version_count()); |
54 | | |
55 | 180k | using namespace std::chrono; |
56 | 180k | std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms = |
57 | 180k | duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
58 | | |
59 | | // build tablet schema in request level |
60 | 180k | RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), |
61 | 180k | *_tablet->tablet_schema())); |
62 | | |
63 | 180k | RowsetWriterContext context; |
64 | 180k | context.txn_id = _req.txn_id; |
65 | 180k | context.txn_expiration = _req.txn_expiration; |
66 | 180k | context.load_id = _req.load_id; |
67 | 180k | context.rowset_state = PREPARED; |
68 | 180k | context.segments_overlap = OVERLAPPING; |
69 | 180k | context.tablet_schema = _tablet_schema; |
70 | 180k | context.newest_write_timestamp = UnixSeconds(); |
71 | 180k | context.tablet_id = _req.tablet_id; |
72 | 180k | context.index_id = _req.index_id; |
73 | 180k | context.tablet = _tablet; |
74 | 180k | context.write_type = DataWriteType::TYPE_DIRECT; |
75 | 180k | context.mow_context = mow_context; |
76 | 180k | context.write_file_cache = _req.write_file_cache; |
77 | 180k | context.partial_update_info = _partial_update_info; |
78 | 180k | context.file_cache_ttl_sec = _tablet->ttl_seconds(); |
79 | 180k | context.storage_resource = _engine.get_storage_resource(_req.storage_vault_id); |
80 | 180k | if (!context.storage_resource) { |
81 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
82 | 0 | _req.storage_vault_id); |
83 | 0 | } |
84 | | |
85 | 180k | _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); |
86 | | |
87 | 180k | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); |
88 | | |
89 | 180k | if (!_skip_writing_rowset_metadata) { |
90 | 54.6k | RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); |
91 | 54.6k | } |
92 | | |
93 | 180k | _is_init = true; |
94 | 180k | return Status::OK(); |
95 | 180k | } |
96 | | |
97 | 180k | Status CloudRowsetBuilder::check_tablet_version_count() { |
98 | 180k | int64_t version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0); |
99 | | // TODO(plat1ko): load backoff algorithm |
100 | 180k | int32_t max_version_config = cloud_tablet()->max_version_config(); |
101 | 180k | if (version_count > max_version_config) { |
102 | 0 | return Status::Error<TOO_MANY_VERSION>( |
103 | 0 | "failed to init rowset builder. version count: {}, exceed limit: {}, " |
104 | 0 | "tablet: {}. Please reduce the frequency of loading data or adjust the " |
105 | 0 | "max_tablet_version_num or time_series_max_tablet_version_numin be.conf to a " |
106 | 0 | "larger value.", |
107 | 0 | version_count, max_version_config, _tablet->tablet_id()); |
108 | 0 | } |
109 | 180k | return Status::OK(); |
110 | 180k | } |
111 | | |
112 | 180k | void CloudRowsetBuilder::update_tablet_stats() { |
113 | 180k | auto* tablet = cloud_tablet(); |
114 | 180k | DCHECK(tablet); |
115 | 180k | DCHECK(_rowset); |
116 | 180k | tablet->fetch_add_approximate_num_rowsets(1); |
117 | 180k | tablet->fetch_add_approximate_num_segments(_rowset->num_segments()); |
118 | 180k | tablet->fetch_add_approximate_num_rows(_rowset->num_rows()); |
119 | 180k | tablet->fetch_add_approximate_data_size(_rowset->total_disk_size()); |
120 | 180k | tablet->fetch_add_approximate_cumu_num_rowsets(1); |
121 | 180k | tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments()); |
122 | 180k | tablet->write_count.fetch_add(1, std::memory_order_relaxed); |
123 | 180k | } |
124 | | |
125 | 542k | CloudTablet* CloudRowsetBuilder::cloud_tablet() { |
126 | 542k | return static_cast<CloudTablet*>(_tablet.get()); |
127 | 542k | } |
128 | | |
129 | 54.4k | const RowsetMetaSharedPtr& CloudRowsetBuilder::rowset_meta() { |
130 | 54.4k | return _rowset_writer->rowset_meta(); |
131 | 54.4k | } |
132 | | |
133 | 180k | Status CloudRowsetBuilder::set_txn_related_delete_bitmap() { |
134 | 180k | if (_tablet->enable_unique_key_merge_on_write()) { |
135 | | // For empty rowsets when skip_writing_empty_rowset_metadata=true, |
136 | | // store only a lightweight marker instead of full rowset info. |
137 | | // This allows CalcDeleteBitmapTask to detect and skip gracefully, |
138 | | // while using minimal memory (~16 bytes per entry). |
139 | 55.5k | if (_skip_writing_rowset_metadata) { |
140 | 32.6k | _engine.txn_delete_bitmap_cache().mark_empty_rowset(_req.txn_id, _tablet->tablet_id(), |
141 | 32.6k | _req.txn_expiration); |
142 | 32.6k | return Status::OK(); |
143 | 32.6k | } |
144 | 22.8k | if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) { |
145 | 22.8k | auto st = _tablet->check_delete_bitmap_correctness( |
146 | 22.8k | _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, *_rowset_ids); |
147 | 22.8k | if (!st.ok()) { |
148 | 0 | LOG(WARNING) << fmt::format( |
149 | 0 | "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " |
150 | 0 | "delete bitmap correctness check failed in commit phase!", |
151 | 0 | _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), |
152 | 0 | _req.partition_id); |
153 | 0 | return st; |
154 | 0 | } |
155 | 22.8k | } |
156 | 22.8k | _engine.txn_delete_bitmap_cache().set_tablet_txn_info( |
157 | 22.8k | _req.txn_id, _tablet->tablet_id(), _delete_bitmap, *_rowset_ids, _rowset, |
158 | 22.8k | _req.txn_expiration, _partial_update_info); |
159 | 22.8k | } |
160 | 148k | return Status::OK(); |
161 | 180k | } |
162 | | #include "common/compile_check_end.h" |
163 | | } // namespace doris |