be/src/cloud/cloud_schema_change_job.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_schema_change_job.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <gen_cpp/cloud.pb.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <chrono> |
25 | | #include <memory> |
26 | | #include <mutex> |
27 | | #include <random> |
28 | | #include <thread> |
29 | | |
30 | | #include "cloud/cloud_meta_mgr.h" |
31 | | #include "cloud/cloud_tablet_mgr.h" |
32 | | #include "common/status.h" |
33 | | #include "service/backend_options.h" |
34 | | #include "storage/delete/delete_handler.h" |
35 | | #include "storage/index/inverted/inverted_index_desc.h" |
36 | | #include "storage/olap_define.h" |
37 | | #include "storage/rowset/beta_rowset.h" |
38 | | #include "storage/rowset/rowset.h" |
39 | | #include "storage/rowset/rowset_factory.h" |
40 | | #include "storage/storage_engine.h" |
41 | | #include "storage/tablet/tablet.h" |
42 | | #include "storage/tablet/tablet_fwd.h" |
43 | | #include "storage/tablet/tablet_meta.h" |
44 | | #include "util/debug_points.h" |
45 | | |
46 | | namespace doris { |
47 | | using namespace ErrorCode; |
48 | | |
49 | | static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; |
50 | | static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; |
51 | | |
52 | | std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting, |
53 | 10.6k | int64_t mem_limit) { |
54 | 10.6k | if (sc_sorting) { |
55 | 8.69k | return std::make_unique<VBaseSchemaChangeWithSorting>(changer, mem_limit); |
56 | 8.69k | } |
57 | | // else sc_directly |
58 | 1.93k | return std::make_unique<VSchemaChangeDirectly>(changer); |
59 | 10.6k | } |
60 | | |
61 | | CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine, |
62 | | std::string job_id, int64_t expiration) |
63 | 10.6k | : _cloud_storage_engine(cloud_storage_engine), |
64 | 10.6k | _job_id(std::move(job_id)), |
65 | 10.6k | _expiration(expiration) { |
66 | 10.6k | _initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & |
67 | 10.6k | std::numeric_limits<int64_t>::max(); |
68 | 10.6k | } |
69 | | |
70 | 10.6k | CloudSchemaChangeJob::~CloudSchemaChangeJob() = default; |
71 | | |
72 | 10.6k | Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { |
73 | 10.6k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.block", DBUG_BLOCK); |
74 | | // new tablet has to exist |
75 | 10.6k | _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id)); |
76 | 10.6k | if (_new_tablet->tablet_state() == TABLET_RUNNING) { |
77 | 14 | LOG(INFO) << "schema change job has already finished. base_tablet_id=" |
78 | 14 | << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id |
79 | 14 | << ", alter_version=" << request.alter_version << ", job_id=" << _job_id; |
80 | 14 | return Status::OK(); |
81 | 14 | } |
82 | | |
83 | 10.6k | _base_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.base_tablet_id)); |
84 | | |
85 | 10.6k | static constexpr long TRY_LOCK_TIMEOUT = 30; |
86 | 10.6k | std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), std::defer_lock); |
87 | 10.6k | bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT)); |
88 | | |
89 | 10.6k | _new_tablet->set_alter_failed(false); |
90 | 10.6k | Defer defer([this] { |
91 | | // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed. |
92 | 10.2k | if (_new_tablet->tablet_state() != TABLET_RUNNING) { |
93 | 78 | _new_tablet->set_alter_failed(true); |
94 | 78 | } |
95 | 10.2k | }); |
96 | | |
97 | 10.6k | if (!owns_lock) { |
98 | 0 | LOG(WARNING) << "Failed to obtain schema change lock, there might be inverted index being " |
99 | 0 | "built on base_tablet=" |
100 | 0 | << request.base_tablet_id; |
101 | 0 | return Status::Error<TRY_LOCK_FAILED>( |
102 | 0 | "Failed to obtain schema change lock, there might be inverted index being " |
103 | 0 | "built on base_tablet=", |
104 | 0 | request.base_tablet_id); |
105 | 0 | } |
106 | | // MUST sync rowsets before capturing rowset readers and building DeleteHandler |
107 | 10.6k | SyncOptions options; |
108 | 10.6k | options.query_version = request.alter_version; |
109 | 10.6k | RETURN_IF_ERROR(_base_tablet->sync_rowsets(options)); |
110 | | // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1] |
111 | 10.6k | _output_cumulative_point = _base_tablet->cumulative_layer_point(); |
112 | 10.6k | std::vector<RowSetSplits> rs_splits; |
113 | 10.6k | int64_t base_max_version = _base_tablet->max_version_unlocked(); |
114 | 10.6k | cloud::TabletJobInfoPB job; |
115 | 10.6k | auto* idx = job.mutable_idx(); |
116 | 10.6k | idx->set_tablet_id(_base_tablet->tablet_id()); |
117 | 10.6k | idx->set_table_id(_base_tablet->table_id()); |
118 | 10.6k | idx->set_index_id(_base_tablet->index_id()); |
119 | 10.6k | idx->set_partition_id(_base_tablet->partition_id()); |
120 | 10.6k | auto* sc_job = job.mutable_schema_change(); |
121 | 10.6k | sc_job->set_id(_job_id); |
122 | 10.6k | sc_job->set_initiator(BackendOptions::get_localhost() + ':' + |
123 | 10.6k | std::to_string(config::heartbeat_service_port)); |
124 | 10.6k | sc_job->set_alter_version(base_max_version); |
125 | 10.6k | auto* new_tablet_idx = sc_job->mutable_new_tablet_idx(); |
126 | 10.6k | new_tablet_idx->set_tablet_id(_new_tablet->tablet_id()); |
127 | 10.6k | new_tablet_idx->set_table_id(_new_tablet->table_id()); |
128 | 10.6k | new_tablet_idx->set_index_id(_new_tablet->index_id()); |
129 | 10.6k | new_tablet_idx->set_partition_id(_new_tablet->partition_id()); |
130 | 10.6k | cloud::StartTabletJobResponse start_resp; |
131 | 10.6k | auto st = _cloud_storage_engine.meta_mgr().prepare_tablet_job(job, &start_resp); |
132 | 10.6k | if (!st.ok()) { |
133 | 0 | if (start_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { |
134 | 0 | st = _new_tablet->sync_rowsets(); |
135 | 0 | if (!st.ok()) { |
136 | 0 | LOG_WARNING("failed to sync new tablet") |
137 | 0 | .tag("tablet_id", _new_tablet->tablet_id()) |
138 | 0 | .error(st); |
139 | 0 | } |
140 | 0 | return Status::OK(); |
141 | 0 | } |
142 | 0 | return st; |
143 | 0 | } |
144 | 10.6k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", { |
145 | 10.6k | auto res = |
146 | 10.6k | Status::InternalError("inject alter tablet failed. base_tablet={}, new_tablet={}", |
147 | 10.6k | request.base_tablet_id, request.new_tablet_id); |
148 | 10.6k | LOG(WARNING) << "inject error. res=" << res; |
149 | 10.6k | return res; |
150 | 10.6k | }); |
151 | 10.6k | if (request.alter_version > 1) { |
152 | | // [0-1] is a placeholder rowset, no need to convert |
153 | 5.19k | RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()}, |
154 | 5.19k | &rs_splits, |
155 | 5.19k | {.skip_missing_versions = false, |
156 | 5.19k | .enable_prefer_cached_rowset = false, |
157 | 5.19k | .query_freshness_tolerance_ms = -1})); |
158 | 5.19k | } |
159 | 10.6k | Defer defer2 {[&]() { |
160 | 10.4k | _new_tablet->set_alter_version(-1); |
161 | 10.4k | _base_tablet->set_alter_version(-1); |
162 | 10.4k | }}; |
163 | 10.6k | _new_tablet->set_alter_version(start_resp.alter_version()); |
164 | 10.6k | _base_tablet->set_alter_version(start_resp.alter_version()); |
165 | 10.6k | LOG(INFO) << "Begin to alter tablet. base_tablet_id=" << request.base_tablet_id |
166 | 10.6k | << ", new_tablet_id=" << request.new_tablet_id |
167 | 10.6k | << ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id; |
168 | 10.6k | sc_job->set_alter_version(start_resp.alter_version()); |
169 | | |
170 | | // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert. |
171 | | |
172 | | // Create a new tablet schema, should merge with dropped columns in light weight schema change |
173 | 10.6k | _base_tablet_schema = std::make_shared<TabletSchema>(); |
174 | 10.6k | _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns); |
175 | 10.6k | _new_tablet_schema = _new_tablet->tablet_schema(); |
176 | | |
177 | 10.6k | std::vector<ColumnId> return_columns; |
178 | 10.6k | return_columns.resize(_base_tablet_schema->num_columns()); |
179 | 10.6k | std::iota(return_columns.begin(), return_columns.end(), 0); |
180 | | |
181 | | // delete handlers to filter out deleted rows |
182 | 10.6k | DeleteHandler delete_handler; |
183 | 10.6k | std::vector<RowsetMetaSharedPtr> delete_predicates; |
184 | 10.6k | for (auto& split : rs_splits) { |
185 | 8.90k | auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); |
186 | 8.90k | if (rs_meta->has_delete_predicate()) { |
187 | 54 | _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema()); |
188 | 54 | delete_predicates.push_back(rs_meta); |
189 | 54 | } |
190 | 8.90k | } |
191 | 10.6k | RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, |
192 | 10.6k | start_resp.alter_version())); |
193 | | |
194 | | // reader_context is stack variables, it's lifetime MUST keep the same with rs_readers |
195 | 10.6k | RowsetReaderContext reader_context; |
196 | 10.6k | reader_context.reader_type = ReaderType::READER_ALTER_TABLE; |
197 | 10.6k | reader_context.tablet_schema = _base_tablet_schema; |
198 | 10.6k | reader_context.need_ordered_result = true; |
199 | 10.6k | reader_context.delete_handler = &delete_handler; |
200 | 10.6k | reader_context.return_columns = &return_columns; |
201 | 10.6k | reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); |
202 | 10.6k | reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; |
203 | 10.6k | reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; |
204 | 10.6k | reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr(); |
205 | 10.6k | reader_context.version = Version(0, start_resp.alter_version()); |
206 | 10.6k | std::vector<uint32_t> cluster_key_idxes; |
207 | 10.6k | if (!_base_tablet_schema->cluster_key_uids().empty()) { |
208 | 699 | for (const auto& uid : _base_tablet_schema->cluster_key_uids()) { |
209 | 699 | cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid)); |
210 | 699 | } |
211 | 132 | reader_context.read_orderby_key_columns = &cluster_key_idxes; |
212 | 132 | reader_context.is_unique = false; |
213 | 132 | reader_context.sequence_id_idx = -1; |
214 | 132 | } |
215 | | |
216 | 10.6k | for (auto& split : rs_splits) { |
217 | 8.88k | RETURN_IF_ERROR(split.rs_reader->init(&reader_context)); |
218 | 8.88k | } |
219 | | |
220 | 10.6k | SchemaChangeParams sc_params; |
221 | | |
222 | | // cache schema change output to file cache |
223 | 10.6k | std::vector<RowsetSharedPtr> rowsets; |
224 | 10.6k | rowsets.resize(rs_splits.size()); |
225 | 10.6k | std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(), |
226 | 10.6k | [](RowSetSplits& split) { return split.rs_reader->rowset(); }); |
227 | 10.6k | sc_params.output_to_file_cache = _should_cache_sc_output(rowsets); |
228 | 10.6k | if (request.__isset.query_globals && request.__isset.query_options) { |
229 | 10.6k | sc_params.runtime_state = |
230 | 10.6k | std::make_shared<RuntimeState>(request.query_options, request.query_globals); |
231 | 10.6k | } else { |
232 | | // for old version request compatibility |
233 | 2 | sc_params.runtime_state = std::make_shared<RuntimeState>(); |
234 | 2 | } |
235 | | |
236 | 10.6k | RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); |
237 | 10.6k | sc_params.ref_rowset_readers.reserve(rs_splits.size()); |
238 | 10.6k | for (RowSetSplits& split : rs_splits) { |
239 | 8.87k | sc_params.ref_rowset_readers.emplace_back(std::move(split.rs_reader)); |
240 | 8.87k | } |
241 | 10.6k | sc_params.delete_handler = &delete_handler; |
242 | 10.6k | sc_params.be_exec_version = request.be_exec_version; |
243 | 10.6k | DCHECK(request.__isset.alter_tablet_type); |
244 | 10.6k | switch (request.alter_tablet_type) { |
245 | 2.01k | case TAlterTabletType::SCHEMA_CHANGE: |
246 | 2.01k | sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE; |
247 | 2.01k | break; |
248 | 8.53k | case TAlterTabletType::ROLLUP: |
249 | 8.53k | sc_params.alter_tablet_type = AlterTabletType::ROLLUP; |
250 | 8.53k | break; |
251 | 0 | case TAlterTabletType::MIGRATION: |
252 | 0 | sc_params.alter_tablet_type = AlterTabletType::MIGRATION; |
253 | 0 | break; |
254 | 10.6k | } |
255 | 10.5k | sc_params.vault_id = request.storage_vault_id; |
256 | 10.5k | if (!request.__isset.materialized_view_params) { |
257 | 3.30k | return _convert_historical_rowsets(sc_params, job); |
258 | 3.30k | } |
259 | 27.8k | for (auto item : request.materialized_view_params) { |
260 | 27.8k | AlterMaterializedViewParam mv_param; |
261 | 27.8k | mv_param.column_name = item.column_name; |
262 | | /* |
263 | | * origin_column_name is always be set now, |
264 | | * but origin_column_name may be not set in some materialized view function. eg:count(1) |
265 | | */ |
266 | 27.8k | if (item.__isset.origin_column_name) { |
267 | 0 | mv_param.origin_column_name = item.origin_column_name; |
268 | 0 | } |
269 | | |
270 | 27.9k | if (item.__isset.mv_expr) { |
271 | 27.9k | mv_param.expr = std::make_shared<TExpr>(item.mv_expr); |
272 | 27.9k | } |
273 | 27.8k | sc_params.materialized_params_map.insert( |
274 | 27.8k | std::make_pair(to_lower(item.column_name), mv_param)); |
275 | 27.8k | } |
276 | 7.24k | sc_params.enable_unique_key_merge_on_write = _new_tablet->enable_unique_key_merge_on_write(); |
277 | 7.24k | return _convert_historical_rowsets(sc_params, job); |
278 | 10.5k | } |
279 | | |
280 | | Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params, |
281 | 10.5k | cloud::TabletJobInfoPB& job) { |
282 | 10.5k | LOG(INFO) << "Begin to convert historical rowsets for new_tablet from base_tablet. base_tablet=" |
283 | 10.5k | << _base_tablet->tablet_id() << ", new_tablet=" << _new_tablet->tablet_id() |
284 | 10.5k | << ", job_id=" << _job_id; |
285 | | |
286 | | // Add filter information in change, and filter column information will be set in _parse_request |
287 | | // And filter some data every time the row block changes |
288 | 10.5k | BlockChanger changer(_new_tablet->tablet_schema(), *sc_params.desc_tbl, |
289 | 10.5k | sc_params.runtime_state); |
290 | | |
291 | 10.5k | bool sc_sorting = false; |
292 | 10.5k | bool sc_directly = false; |
293 | | |
294 | | // 1. Parse the Alter request and convert it into an internal representation |
295 | 10.5k | RETURN_IF_ERROR(SchemaChangeJob::parse_request(sc_params, _base_tablet_schema.get(), |
296 | 10.5k | _new_tablet_schema.get(), &changer, &sc_sorting, |
297 | 10.5k | &sc_directly)); |
298 | 10.5k | if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) { |
299 | 0 | LOG(INFO) << "Don't support to add materialized view by linked schema change"; |
300 | 0 | return Status::InternalError( |
301 | 0 | "Don't support to add materialized view by linked schema change"); |
302 | 0 | } |
303 | | |
304 | 10.5k | LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting |
305 | 10.5k | << ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id() |
306 | 10.5k | << ", new_tablet=" << _new_tablet->tablet_id(); |
307 | | |
308 | | // 2. Generate historical data converter |
309 | 10.5k | auto sc_procedure = get_sc_procedure( |
310 | 10.5k | changer, sc_sorting, |
311 | 10.5k | _cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); |
312 | | |
313 | 10.5k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK); |
314 | | |
315 | | // 3. Convert historical data |
316 | 10.5k | bool already_exist_any_version = false; |
317 | 10.5k | for (const auto& rs_reader : sc_params.ref_rowset_readers) { |
318 | 8.72k | VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version(); |
319 | | |
320 | 8.72k | RowsetWriterContext context; |
321 | 8.72k | context.txn_id = rs_reader->rowset()->txn_id(); |
322 | 8.72k | context.txn_expiration = _expiration; |
323 | 8.72k | context.version = rs_reader->version(); |
324 | 8.72k | context.rowset_state = VISIBLE; |
325 | 8.72k | context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); |
326 | 8.72k | context.tablet_schema = _new_tablet->tablet_schema(); |
327 | 8.72k | context.newest_write_timestamp = rs_reader->newest_write_timestamp(); |
328 | 8.72k | context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id); |
329 | 8.72k | context.job_id = _job_id; |
330 | 8.72k | context.write_file_cache = sc_params.output_to_file_cache; |
331 | 8.72k | context.tablet = _new_tablet; |
332 | 8.72k | if (!context.storage_resource) { |
333 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
334 | 0 | sc_params.vault_id); |
335 | 0 | } |
336 | | |
337 | 8.72k | context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; |
338 | | // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index |
339 | 8.72k | bool vertical = false; |
340 | 8.72k | if (sc_sorting && !_new_tablet->tablet_schema()->cluster_key_uids().empty()) { |
341 | | // see VBaseSchemaChangeWithSorting::_external_sorting |
342 | 0 | vertical = true; |
343 | 0 | } |
344 | 8.72k | auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, vertical)); |
345 | | |
346 | 8.72k | RowsetMetaSharedPtr existed_rs_meta; |
347 | 8.72k | auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), |
348 | 8.72k | _job_id, &existed_rs_meta); |
349 | 8.72k | if (!st.ok()) { |
350 | 5 | if (st.is<ALREADY_EXIST>()) { |
351 | 4 | LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " |
352 | 4 | << _new_tablet->tablet_id(); |
353 | | // Add already committed rowset to _output_rowsets. |
354 | 4 | DCHECK(existed_rs_meta != nullptr); |
355 | 4 | RowsetSharedPtr rowset; |
356 | | // schema is nullptr implies using RowsetMeta.tablet_schema |
357 | 4 | RETURN_IF_ERROR( |
358 | 4 | RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset)); |
359 | 4 | _output_rowsets.push_back(std::move(rowset)); |
360 | 4 | already_exist_any_version = true; |
361 | 4 | continue; |
362 | 4 | } else { |
363 | 1 | return st; |
364 | 1 | } |
365 | 5 | } |
366 | | |
367 | 8.72k | st = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet, |
368 | 8.72k | _base_tablet_schema, _new_tablet_schema); |
369 | 8.72k | if (!st.ok()) { |
370 | 77 | return Status::InternalError( |
371 | 77 | "failed to process schema change on rowset, version=[{}-{}], status={}", |
372 | 77 | rs_reader->version().first, rs_reader->version().second, st.to_string()); |
373 | 77 | } |
374 | | |
375 | 8.64k | RowsetSharedPtr new_rowset; |
376 | 8.64k | st = rowset_writer->build(new_rowset); |
377 | 8.64k | if (!st.ok()) { |
378 | 0 | return Status::InternalError("failed to build rowset, version=[{}-{}] status={}", |
379 | 0 | rs_reader->version().first, rs_reader->version().second, |
380 | 0 | st.to_string()); |
381 | 0 | } |
382 | | |
383 | 8.64k | st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id, |
384 | 8.64k | &existed_rs_meta); |
385 | 8.64k | if (!st.ok()) { |
386 | 0 | if (st.is<ALREADY_EXIST>()) { |
387 | 0 | LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " |
388 | 0 | << _new_tablet->tablet_id(); |
389 | | // Add already committed rowset to _output_rowsets. |
390 | 0 | DCHECK(existed_rs_meta != nullptr); |
391 | 0 | RowsetSharedPtr rowset; |
392 | | // schema is nullptr implies using RowsetMeta.tablet_schema |
393 | 0 | RETURN_IF_ERROR( |
394 | 0 | RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset)); |
395 | 0 | _output_rowsets.push_back(std::move(rowset)); |
396 | 0 | continue; |
397 | 0 | } else { |
398 | 0 | return st; |
399 | 0 | } |
400 | 0 | } |
401 | 8.64k | _output_rowsets.push_back(std::move(new_rowset)); |
402 | | |
403 | 8.64k | VLOG_TRACE << "Successfully convert a history version " << rs_reader->version(); |
404 | 8.64k | } |
405 | 10.4k | auto* sc_job = job.mutable_schema_change(); |
406 | 10.4k | if (!sc_params.ref_rowset_readers.empty()) { |
407 | 5.11k | int64_t num_output_rows = 0; |
408 | 5.11k | int64_t size_output_rowsets = 0; |
409 | 5.11k | int64_t num_output_segments = 0; |
410 | 5.11k | int64_t index_size_output_rowsets = 0; |
411 | 5.11k | int64_t segment_size_output_rowsets = 0; |
412 | 8.51k | for (auto& rs : _output_rowsets) { |
413 | 8.51k | sc_job->add_txn_ids(rs->txn_id()); |
414 | 8.51k | sc_job->add_output_versions(rs->end_version()); |
415 | 8.51k | num_output_rows += rs->num_rows(); |
416 | 8.51k | size_output_rowsets += rs->total_disk_size(); |
417 | 8.51k | num_output_segments += rs->num_segments(); |
418 | 8.51k | index_size_output_rowsets += rs->index_disk_size(); |
419 | 8.51k | segment_size_output_rowsets += rs->data_disk_size(); |
420 | 8.51k | } |
421 | 5.11k | sc_job->set_num_output_rows(num_output_rows); |
422 | 5.11k | sc_job->set_size_output_rowsets(size_output_rowsets); |
423 | 5.11k | sc_job->set_num_output_segments(num_output_segments); |
424 | 5.11k | sc_job->set_num_output_rowsets(_output_rowsets.size()); |
425 | 5.11k | sc_job->set_index_size_output_rowsets(index_size_output_rowsets); |
426 | 5.11k | sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets); |
427 | 5.11k | } |
428 | 10.4k | _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1); |
429 | 10.4k | sc_job->set_output_cumulative_point(_output_cumulative_point); |
430 | | |
431 | 10.4k | DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", DBUG_BLOCK); |
432 | | // process delete bitmap if the table is MOW |
433 | 10.4k | bool has_stop_token {false}; |
434 | 10.4k | bool should_clear_stop_token {true}; |
435 | 10.4k | Defer defer {[&]() { |
436 | 10.3k | if (has_stop_token) { |
437 | 1.16k | static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token( |
438 | 1.16k | _new_tablet, should_clear_stop_token)); |
439 | 1.16k | } |
440 | 10.3k | }}; |
441 | 10.4k | if (_new_tablet->enable_unique_key_merge_on_write()) { |
442 | 1.16k | has_stop_token = true; |
443 | | // If there are historical versions of rowsets, we need to recalculate their delete |
444 | | // bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets |
445 | 1.16k | int64_t start_calc_delete_bitmap_version = |
446 | | // [0-1] is a placeholder rowset, start from 2. |
447 | 1.16k | already_exist_any_version ? 2 : sc_job->alter_version() + 1; |
448 | 1.16k | RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(), |
449 | 1.16k | start_calc_delete_bitmap_version, _initiator, |
450 | 1.16k | sc_params.vault_id)); |
451 | 1.16k | sc_job->set_delete_bitmap_lock_initiator(_initiator); |
452 | 1.16k | } |
453 | | |
454 | 10.4k | cloud::FinishTabletJobResponse finish_resp; |
455 | 10.4k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", { |
456 | 10.4k | std::srand(static_cast<unsigned int>(std::time(nullptr))); |
457 | 10.4k | int random_value = std::rand() % 100; |
458 | 10.4k | if (random_value < 20) { |
459 | 10.4k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test txn conflict"); |
460 | 10.4k | } |
461 | 10.4k | }); |
462 | 10.4k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job", { |
463 | 10.4k | LOG_INFO("inject retryable error before commit sc job, tablet={}", |
464 | 10.4k | _new_tablet->tablet_id()); |
465 | 10.4k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected retryable error"); |
466 | 10.4k | }); |
467 | 10.4k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job", |
468 | 10.4k | DBUG_BLOCK); |
469 | 10.4k | auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); |
470 | 10.4k | if (!st.ok()) { |
471 | 0 | if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { |
472 | 0 | st = _new_tablet->sync_rowsets(); |
473 | 0 | if (!st.ok()) { |
474 | 0 | LOG_WARNING("failed to sync new tablet") |
475 | 0 | .tag("tablet_id", _new_tablet->tablet_id()) |
476 | 0 | .error(st); |
477 | 0 | } |
478 | 0 | return Status::OK(); |
479 | 0 | } |
480 | 0 | return st; |
481 | 10.4k | } else { |
482 | 10.4k | should_clear_stop_token = false; |
483 | 10.4k | } |
484 | 10.4k | const auto& stats = finish_resp.stats(); |
485 | 10.4k | { |
486 | | // to prevent the converted historical rowsets be replaced by rowsets written on new tablet |
487 | | // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread |
488 | 10.4k | std::unique_lock lock {_new_tablet->get_sync_meta_lock()}; |
489 | 10.4k | std::unique_lock wlock(_new_tablet->get_header_lock()); |
490 | 10.4k | _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false); |
491 | 10.4k | _new_tablet->set_cumulative_layer_point(_output_cumulative_point); |
492 | 10.4k | _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
493 | 10.4k | stats.num_rows(), stats.data_size()); |
494 | 10.4k | RETURN_IF_ERROR(_new_tablet->set_tablet_state(TABLET_RUNNING)); |
495 | 10.4k | } |
496 | 10.4k | return Status::OK(); |
497 | 10.4k | } |
498 | | |
499 | | Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, |
500 | | int64_t start_calc_delete_bitmap_version, |
501 | | int64_t initiator, |
502 | 1.16k | const std::string& vault_id) { |
503 | 1.16k | LOG_INFO("process mow table") |
504 | 1.16k | .tag("new_tablet_id", _new_tablet->tablet_id()) |
505 | 1.16k | .tag("out_rowset_size", _output_rowsets.size()) |
506 | 1.16k | .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) |
507 | 1.16k | .tag("alter_version", alter_version); |
508 | 1.16k | RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); |
509 | 1.16k | TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta())); |
510 | 1.16k | tmp_meta->delete_bitmap().delete_bitmap.clear(); |
511 | | // Keep only version [0-1] rowset, other rowsets will be added in _output_rowsets |
512 | 1.16k | auto& rs_metas = tmp_meta->all_mutable_rs_metas(); |
513 | 2.34k | for (auto it = rs_metas.begin(); it != rs_metas.end();) { |
514 | 1.17k | const auto& rs_meta = it->second; |
515 | 1.17k | if (rs_meta->version().first == 0 && rs_meta->version().second == 1) { |
516 | 1.16k | ++it; |
517 | 1.16k | } else { |
518 | 11 | it = rs_metas.erase(it); |
519 | 11 | } |
520 | 1.17k | } |
521 | | |
522 | 1.16k | std::shared_ptr<CloudTablet> tmp_tablet = |
523 | 1.16k | std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta); |
524 | 1.16k | { |
525 | 1.16k | std::unique_lock wlock(tmp_tablet->get_header_lock()); |
526 | 1.16k | tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); |
527 | | // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version |
528 | 1.16k | tmp_tablet->set_alter_version(alter_version); |
529 | 1.16k | } |
530 | | |
531 | | // step 1, process incremental rowset without delete bitmap update lock |
532 | 1.16k | RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); |
533 | 1.16k | int64_t max_version = tmp_tablet->max_version().second; |
534 | 1.16k | LOG(INFO) << "alter table for mow table, calculate delete bitmap of " |
535 | 1.16k | << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version |
536 | 1.16k | << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); |
537 | 1.16k | if (max_version >= start_calc_delete_bitmap_version) { |
538 | 4 | auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( |
539 | 4 | {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {})); |
540 | 4 | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", |
541 | 4 | DBUG_BLOCK); |
542 | 4 | { |
543 | 4 | std::unique_lock wlock(tmp_tablet->get_header_lock()); |
544 | 4 | tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); |
545 | 4 | } |
546 | 8 | for (auto rowset : ret.rowsets) { |
547 | 8 | RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); |
548 | 8 | } |
549 | 4 | } |
550 | | |
551 | 1.16k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.before_new_inc.block", |
552 | 1.16k | DBUG_BLOCK); |
553 | | |
554 | | // step 2, process incremental rowset with delete bitmap update lock |
555 | 1.16k | RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock( |
556 | 1.16k | *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator)); |
557 | 1.16k | RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); |
558 | 1.16k | int64_t new_max_version = tmp_tablet->max_version().second; |
559 | 1.16k | LOG(INFO) << "alter table for mow table, calculate delete bitmap of " |
560 | 1.16k | << "incremental rowsets with lock, version: " << max_version + 1 << "-" |
561 | 1.16k | << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); |
562 | 1.16k | if (new_max_version > max_version) { |
563 | 2 | auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked( |
564 | 2 | {max_version + 1, new_max_version}, CaptureRowsetOps {})); |
565 | 2 | { |
566 | 2 | std::unique_lock wlock(tmp_tablet->get_header_lock()); |
567 | 2 | tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false); |
568 | 2 | } |
569 | 2 | for (auto rowset : ret.rowsets) { |
570 | 2 | RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); |
571 | 2 | } |
572 | 2 | } |
573 | | |
574 | 1.16k | DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.inject_sleep", { |
575 | 1.16k | auto p = dp->param("percent", 0.01); |
576 | 1.16k | auto sleep_time = dp->param("sleep", 100); |
577 | 1.16k | std::mt19937 gen {std::random_device {}()}; |
578 | 1.16k | std::bernoulli_distribution inject_fault {p}; |
579 | 1.16k | if (inject_fault(gen)) { |
580 | 1.16k | LOG_INFO("injection sleep for {} seconds, tablet_id={}, sc job_id={}", sleep_time, |
581 | 1.16k | _new_tablet->tablet_id(), _job_id); |
582 | 1.16k | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
583 | 1.16k | } |
584 | 1.16k | }); |
585 | | |
586 | 1.16k | auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap(); |
587 | 1.16k | auto storage_resource = _cloud_storage_engine.get_storage_resource(vault_id); |
588 | | // step4, store delete bitmap |
589 | 1.16k | RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap( |
590 | 1.16k | *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap, |
591 | 1.16k | &delete_bitmap, "", storage_resource, config::delete_bitmap_store_write_version)); |
592 | | |
593 | 1.16k | _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap; |
594 | 1.16k | return Status::OK(); |
595 | 1.16k | } |
596 | | |
597 | 78 | void CloudSchemaChangeJob::clean_up_on_failure() { |
598 | 78 | if (_new_tablet == nullptr) { |
599 | 0 | return; |
600 | 0 | } |
601 | 78 | if (_new_tablet->keys_type() == KeysType::UNIQUE_KEYS && |
602 | 78 | _new_tablet->enable_unique_key_merge_on_write()) { |
603 | 35 | _cloud_storage_engine.meta_mgr().remove_delete_bitmap_update_lock( |
604 | 35 | _new_tablet->table_id(), SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, _initiator, |
605 | 35 | _new_tablet->tablet_id()); |
606 | 35 | } |
607 | 122 | for (const auto& output_rs : _output_rowsets) { |
608 | 122 | if (output_rs.use_count() > 2) { |
609 | 0 | LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has " |
610 | 0 | << output_rs.use_count() |
611 | 0 | << " references. File Cache won't be recycled when query is using it."; |
612 | 0 | return; |
613 | 0 | } |
614 | 122 | output_rs->clear_cache(); |
615 | 122 | } |
616 | 78 | } |
617 | | |
618 | | bool CloudSchemaChangeJob::_should_cache_sc_output( |
619 | 10.4k | const std::vector<RowsetSharedPtr>& input_rowsets) { |
620 | 10.4k | int64_t total_size = 0; |
621 | 10.4k | int64_t cached_index_size = 0; |
622 | 10.4k | int64_t cached_data_size = 0; |
623 | | |
624 | 10.4k | for (const auto& rs : input_rowsets) { |
625 | 8.83k | const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta(); |
626 | 8.83k | total_size += rs_meta->total_disk_size(); |
627 | 8.83k | cached_index_size += rs->approximate_cache_index_size(); |
628 | 8.83k | cached_data_size += rs->approximate_cached_data_size(); |
629 | 8.83k | } |
630 | | |
631 | 10.4k | double input_hit_rate = static_cast<double>(cached_index_size + cached_data_size) / total_size; |
632 | | |
633 | 10.4k | LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. " |
634 | 10.4k | << "job_id=" << _job_id << ", input_rowsets_count=" << input_rowsets.size() |
635 | 10.4k | << ", total_size=" << total_size << ", cached_index_size=" << cached_index_size |
636 | 10.4k | << ", cached_data_size=" << cached_data_size << ", input_hit_rate=" << input_hit_rate |
637 | 10.4k | << ", min_hit_ratio_threshold=" |
638 | 10.4k | << config::file_cache_keep_schema_change_output_min_hit_ratio << ", should_cache=" |
639 | 10.4k | << (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio); |
640 | | |
641 | 10.4k | if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) { |
642 | 1.55k | return true; |
643 | 1.55k | } |
644 | | |
645 | 8.85k | return false; |
646 | 10.4k | } |
647 | | |
648 | | } // namespace doris |