/root/doris/be/src/olap/tablet_manager.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/tablet_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/AgentService_types.h> |
22 | | #include <gen_cpp/BackendService_types.h> |
23 | | #include <gen_cpp/Descriptors_types.h> |
24 | | #include <gen_cpp/MasterService_types.h> |
25 | | #include <gen_cpp/Types_types.h> |
26 | | #include <gen_cpp/olap_file.pb.h> |
27 | | #include <re2/re2.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <list> |
32 | | #include <mutex> |
33 | | #include <ostream> |
34 | | #include <string_view> |
35 | | |
36 | | #include "bvar/bvar.h" |
37 | | #include "common/compiler_util.h" // IWYU pragma: keep |
38 | | #include "common/config.h" |
39 | | #include "common/logging.h" |
40 | | #include "gutil/strings/strcat.h" |
41 | | #include "gutil/strings/substitute.h" |
42 | | #include "io/fs/local_file_system.h" |
43 | | #include "olap/cumulative_compaction_time_series_policy.h" |
44 | | #include "olap/data_dir.h" |
45 | | #include "olap/olap_common.h" |
46 | | #include "olap/olap_define.h" |
47 | | #include "olap/olap_meta.h" |
48 | | #include "olap/pb_helper.h" |
49 | | #include "olap/rowset/beta_rowset.h" |
50 | | #include "olap/rowset/rowset.h" |
51 | | #include "olap/rowset/rowset_meta_manager.h" |
52 | | #include "olap/storage_engine.h" |
53 | | #include "olap/tablet.h" |
54 | | #include "olap/tablet_meta.h" |
55 | | #include "olap/tablet_meta_manager.h" |
56 | | #include "olap/tablet_schema.h" |
57 | | #include "olap/txn_manager.h" |
58 | | #include "runtime/exec_env.h" |
59 | | #include "service/backend_options.h" |
60 | | #include "util/defer_op.h" |
61 | | #include "util/doris_metrics.h" |
62 | | #include "util/histogram.h" |
63 | | #include "util/metrics.h" |
64 | | #include "util/path_util.h" |
65 | | #include "util/scoped_cleanup.h" |
66 | | #include "util/stopwatch.hpp" |
67 | | #include "util/time.h" |
68 | | #include "util/trace.h" |
69 | | #include "util/uid_util.h" |
70 | | |
71 | | namespace doris { |
72 | | class CumulativeCompactionPolicy; |
73 | | } // namespace doris |
74 | | |
75 | | using std::map; |
76 | | using std::set; |
77 | | using std::string; |
78 | | using std::vector; |
79 | | |
80 | | namespace doris { |
81 | | using namespace ErrorCode; |
82 | | |
83 | | bvar::Adder<int64_t> g_tablet_meta_schema_columns_count("tablet_meta_schema_columns_count"); |
84 | | |
85 | | TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) |
86 | | : _engine(engine), |
87 | | _tablets_shards_size(tablet_map_lock_shard_size), |
88 | 206 | _tablets_shards_mask(tablet_map_lock_shard_size - 1) { |
89 | 206 | CHECK_GT(_tablets_shards_size, 0); |
90 | 206 | CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); |
91 | 206 | _tablets_shards.resize(_tablets_shards_size); |
92 | 206 | } |
93 | | |
94 | 206 | TabletManager::~TabletManager() = default; |
95 | | |
96 | | Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet, |
97 | 304 | bool update_meta, bool force, RuntimeProfile* profile) { |
98 | 304 | if (profile->get_counter("AddTablet") == nullptr) { |
99 | 4 | ADD_TIMER(profile, "AddTablet"); |
100 | 4 | } |
101 | 304 | Status res = Status::OK(); |
102 | 304 | VLOG_NOTICE << "begin to add tablet to TabletManager. " |
103 | 0 | << "tablet_id=" << tablet_id << ", force=" << force; |
104 | | |
105 | 304 | TabletSharedPtr existed_tablet = nullptr; |
106 | 304 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
107 | 304 | const auto& iter = tablet_map.find(tablet_id); |
108 | 304 | if (iter != tablet_map.end()) { |
109 | 4 | existed_tablet = iter->second; |
110 | 4 | } |
111 | | |
112 | 304 | if (existed_tablet == nullptr) { |
113 | 300 | return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/, |
114 | 300 | false /*drop_old*/, profile); |
115 | 300 | } |
116 | | // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets |
117 | | // and then reload the tablet, the tablet's path will the same |
118 | 4 | if (!force) { |
119 | 2 | if (existed_tablet->tablet_path() == tablet->tablet_path()) { |
120 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
121 | 0 | "add the same tablet twice! tablet_id={}, tablet_path={}", tablet_id, |
122 | 0 | tablet->tablet_path()); |
123 | 0 | } |
124 | 2 | if (existed_tablet->data_dir() == tablet->data_dir()) { |
125 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
126 | 0 | "add tablet with same data dir twice! tablet_id={}", tablet_id); |
127 | 0 | } |
128 | 2 | } |
129 | | |
130 | 4 | MonotonicStopWatch watch; |
131 | 4 | watch.start(); |
132 | | |
133 | | // During storage migration, the tablet is moved to another disk, have to check |
134 | | // if the new tablet's rowset version is larger than the old one to prevent losting data during |
135 | | // migration |
136 | 4 | int64_t old_time, new_time; |
137 | 4 | int32_t old_version, new_version; |
138 | 4 | { |
139 | 4 | std::shared_lock rdlock(existed_tablet->get_header_lock()); |
140 | 4 | const RowsetSharedPtr old_rowset = existed_tablet->get_rowset_with_max_version(); |
141 | 4 | const RowsetSharedPtr new_rowset = tablet->get_rowset_with_max_version(); |
142 | | // If new tablet is empty, it is a newly created schema change tablet. |
143 | | // the old tablet is dropped before add tablet. it should not exist old tablet |
144 | 4 | if (new_rowset == nullptr) { |
145 | | // it seems useless to call unlock and return here. |
146 | | // it could prevent error when log level is changed in the future. |
147 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
148 | 0 | "new tablet is empty and old tablet exists. it should not happen. tablet_id={}", |
149 | 0 | tablet_id); |
150 | 0 | } |
151 | 4 | old_time = old_rowset == nullptr ? -1 : old_rowset->creation_time(); |
152 | 4 | new_time = new_rowset->creation_time(); |
153 | 4 | old_version = old_rowset == nullptr ? -1 : old_rowset->end_version(); |
154 | 4 | new_version = new_rowset->end_version(); |
155 | 4 | } |
156 | 4 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetExistTabletVersion", "AddTablet"), |
157 | 4 | static_cast<int64_t>(watch.reset())); |
158 | | |
159 | | // In restore process, we replace all origin files in tablet dir with |
160 | | // the downloaded snapshot files. Then we try to reload tablet header. |
161 | | // force == true means we forcibly replace the Tablet in tablet_map |
162 | | // with the new one. But if we do so, the files in the tablet dir will be |
163 | | // dropped when the origin Tablet deconstruct. |
164 | | // So we set keep_files == true to not delete files when the |
165 | | // origin Tablet deconstruct. |
166 | | // During restore process, snapshot loader |
167 | | // replaced the old tablet's rowset with new rowsets, but the tablet path is reused, if drop files |
168 | | // here, the new rowset's file will also be dropped, so use keep files here |
169 | 4 | bool keep_files = force; |
170 | 4 | if (force || |
171 | 4 | (new_version > old_version || (new_version == old_version && new_time >= old_time))) { |
172 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
173 | 4 | res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files, |
174 | 4 | true /*drop_old*/, profile); |
175 | 4 | } else { |
176 | 0 | RETURN_IF_ERROR(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
177 | 0 | tablet->save_meta(); |
178 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
179 | 0 | static_cast<int64_t>(watch.reset())); |
180 | 0 | { |
181 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
182 | 0 | _shutdown_tablets.push_back(tablet); |
183 | 0 | } |
184 | |
|
185 | 0 | res = Status::Error<ENGINE_INSERT_OLD_TABLET>( |
186 | 0 | "set tablet to shutdown state. tablet_id={}, tablet_path={}", tablet->tablet_id(), |
187 | 0 | tablet->tablet_path()); |
188 | 0 | } |
189 | 4 | LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res |
190 | 4 | << ", tablet_id=" << tablet_id << ", old_version=" << old_version |
191 | 4 | << ", new_version=" << new_version << ", old_time=" << old_time |
192 | 4 | << ", new_time=" << new_time |
193 | 4 | << ", old_tablet_path=" << existed_tablet->tablet_path() |
194 | 4 | << ", new_tablet_path=" << tablet->tablet_path(); |
195 | | |
196 | 4 | return res; |
197 | 4 | } |
198 | | |
199 | | Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, |
200 | | const TabletSharedPtr& tablet, bool update_meta, |
201 | | bool keep_files, bool drop_old, |
202 | 304 | RuntimeProfile* profile) { |
203 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
204 | 304 | Status res = Status::OK(); |
205 | 304 | MonotonicStopWatch watch; |
206 | 304 | watch.start(); |
207 | 304 | if (update_meta) { |
208 | | // call tablet save meta in order to valid the meta |
209 | 304 | tablet->save_meta(); |
210 | 304 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
211 | 304 | static_cast<int64_t>(watch.reset())); |
212 | 304 | } |
213 | 304 | if (drop_old) { |
214 | | // If the new tablet is fresher than the existing one, then replace |
215 | | // the existing tablet with the new one. |
216 | | // Use default replica_id to ignore whether replica_id is match when drop tablet. |
217 | 4 | Status status = _drop_tablet(tablet_id, /* replica_id */ 0, keep_files, false, true); |
218 | 4 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropOldTablet", "AddTablet"), |
219 | 4 | static_cast<int64_t>(watch.reset())); |
220 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
221 | 4 | status, strings::Substitute("failed to drop old tablet when add new tablet. " |
222 | 4 | "tablet_id=$0", |
223 | 4 | tablet_id)); |
224 | 4 | } |
225 | | // Register tablet into DataDir, so that we can manage tablet from |
226 | | // the perspective of root path. |
227 | | // Example: unregister all tables when a bad disk found. |
228 | 304 | tablet->register_tablet_into_dir(); |
229 | 304 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
230 | 304 | tablet_map[tablet_id] = tablet; |
231 | 304 | _add_tablet_to_partition(tablet); |
232 | 304 | g_tablet_meta_schema_columns_count << tablet->tablet_meta()->tablet_columns_num(); |
233 | 304 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RegisterTabletInfo", "AddTablet"), |
234 | 304 | static_cast<int64_t>(watch.reset())); |
235 | | |
236 | 304 | VLOG_NOTICE << "add tablet to map successfully." |
237 | 0 | << " tablet_id=" << tablet_id; |
238 | | |
239 | 304 | return res; |
240 | 304 | } |
241 | | |
242 | 0 | bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) { |
243 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
244 | 0 | return _check_tablet_id_exist_unlocked(tablet_id); |
245 | 0 | } |
246 | | |
247 | 0 | bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { |
248 | 0 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
249 | 0 | return tablet_map.find(tablet_id) != tablet_map.end(); |
250 | 0 | } |
251 | | |
252 | | Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores, |
253 | 303 | RuntimeProfile* profile) { |
254 | 303 | DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
255 | | |
256 | 303 | int64_t tablet_id = request.tablet_id; |
257 | 303 | LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id |
258 | 303 | << ", table_id=" << request.table_id << ", partition_id=" << request.partition_id |
259 | 303 | << ", replica_id=" << request.replica_id << ", stores.size=" << stores.size() |
260 | 303 | << ", first store=" << stores[0]->path(); |
261 | | |
262 | | // when we create rollup tablet A(assume on shard-1) from tablet B(assume on shard-2) |
263 | | // we need use write lock on shard-1 and then use read lock on shard-2 |
264 | | // if there have create rollup tablet C(assume on shard-2) from tablet D(assume on shard-1) at the same time, we will meet deadlock |
265 | 303 | std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock); |
266 | 303 | bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
267 | 303 | bool is_schema_change_or_atomic_restore = |
268 | 303 | request.__isset.base_tablet_id && request.base_tablet_id > 0; |
269 | 303 | bool need_two_lock = |
270 | 303 | is_schema_change_or_atomic_restore && |
271 | 303 | ((_tablets_shards_mask & request.base_tablet_id) != (_tablets_shards_mask & tablet_id)); |
272 | 303 | if (need_two_lock) { |
273 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock")); |
274 | 0 | two_tablet_lock.lock(); |
275 | 0 | } |
276 | | |
277 | 303 | MonotonicStopWatch shard_lock_watch; |
278 | 303 | shard_lock_watch.start(); |
279 | 303 | std::lock_guard wrlock(_get_tablets_shard_lock(tablet_id)); |
280 | 303 | shard_lock_watch.stop(); |
281 | 303 | COUNTER_UPDATE(ADD_TIMER(profile, "GetShardLock"), |
282 | 303 | static_cast<int64_t>(shard_lock_watch.elapsed_time())); |
283 | | // Make create_tablet operation to be idempotent: |
284 | | // 1. Return true if tablet with same tablet_id and schema_hash exist; |
285 | | // false if tablet with same tablet_id but different schema_hash exist. |
286 | | // 2. When this is an alter task, if the tablet(both tablet_id and schema_hash are |
287 | | // same) already exist, then just return true(an duplicate request). But if |
288 | | // tablet_id exist but with different schema_hash, return an error(report task will |
289 | | // eventually trigger its deletion). |
290 | 303 | { |
291 | 303 | SCOPED_TIMER(ADD_TIMER(profile, "GetTabletUnlocked")); |
292 | 303 | if (_get_tablet_unlocked(tablet_id) != nullptr) { |
293 | 3 | LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; |
294 | 3 | return Status::OK(); |
295 | 3 | } |
296 | 303 | } |
297 | | |
298 | 300 | TabletSharedPtr base_tablet = nullptr; |
299 | | // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request |
300 | 300 | if (is_schema_change_or_atomic_restore) { |
301 | | // if base_tablet_id's lock diffrent with new_tablet_id, we need lock it. |
302 | 0 | if (need_two_lock) { |
303 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet")); |
304 | 0 | base_tablet = get_tablet(request.base_tablet_id); |
305 | 0 | two_tablet_lock.unlock(); |
306 | 0 | } else { |
307 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTabletUnlocked")); |
308 | 0 | base_tablet = _get_tablet_unlocked(request.base_tablet_id); |
309 | 0 | } |
310 | 0 | if (base_tablet == nullptr) { |
311 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
312 | 0 | return Status::Error<TABLE_CREATE_META_ERROR>( |
313 | 0 | "fail to create tablet(change schema/atomic restore), base tablet does not " |
314 | 0 | "exist. new_tablet_id={}, base_tablet_id={}", |
315 | 0 | tablet_id, request.base_tablet_id); |
316 | 0 | } |
317 | | // If we are doing schema-change or atomic-restore, we should use the same data dir |
318 | | // TODO(lingbin): A litter trick here, the directory should be determined before |
319 | | // entering this method |
320 | | // |
321 | | // ATTN: Since all restored replicas will be saved to HDD, so no storage_medium check here. |
322 | 0 | if (in_restore_mode || |
323 | 0 | request.storage_medium == base_tablet->data_dir()->storage_medium()) { |
324 | 0 | LOG(INFO) << "create tablet use the base tablet data dir. tablet_id=" << tablet_id |
325 | 0 | << ", base tablet_id=" << request.base_tablet_id |
326 | 0 | << ", data dir=" << base_tablet->data_dir()->path(); |
327 | 0 | stores.clear(); |
328 | 0 | stores.push_back(base_tablet->data_dir()); |
329 | 0 | } |
330 | 0 | } |
331 | | |
332 | | // set alter type to schema-change. it is useless |
333 | 300 | TabletSharedPtr tablet = _internal_create_tablet_unlocked( |
334 | 300 | request, is_schema_change_or_atomic_restore, base_tablet.get(), stores, profile); |
335 | 300 | if (tablet == nullptr) { |
336 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
337 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. tablet_id={}", |
338 | 0 | request.tablet_id); |
339 | 0 | } |
340 | | |
341 | 300 | LOG(INFO) << "success to create tablet. tablet_id=" << tablet_id |
342 | 300 | << ", tablet_path=" << tablet->tablet_path(); |
343 | 300 | return Status::OK(); |
344 | 300 | } |
345 | | |
346 | | TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( |
347 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
348 | 300 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
349 | | // If in schema-change state, base_tablet must also be provided. |
350 | | // i.e., is_schema_change and base_tablet are either assigned or not assigned |
351 | 300 | DCHECK((is_schema_change && base_tablet) || (!is_schema_change && !base_tablet)); |
352 | | |
353 | | // NOTE: The existence of tablet_id and schema_hash has already been checked, |
354 | | // no need check again here. |
355 | | |
356 | 300 | const std::string parent_timer_name = "InternalCreateTablet"; |
357 | 300 | SCOPED_TIMER(ADD_TIMER(profile, parent_timer_name)); |
358 | | |
359 | 300 | MonotonicStopWatch watch; |
360 | 300 | watch.start(); |
361 | 300 | auto create_meta_timer = ADD_CHILD_TIMER(profile, "CreateMeta", parent_timer_name); |
362 | 300 | auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change, base_tablet, |
363 | 300 | data_dirs, profile); |
364 | 300 | COUNTER_UPDATE(create_meta_timer, static_cast<int64_t>(watch.reset())); |
365 | 300 | if (tablet == nullptr) { |
366 | 0 | return nullptr; |
367 | 0 | } |
368 | | |
369 | 300 | int64_t new_tablet_id = request.tablet_id; |
370 | 300 | int32_t new_schema_hash = request.tablet_schema.schema_hash; |
371 | | |
372 | | // should remove the tablet's pending_id no matter create-tablet success or not |
373 | 300 | DataDir* data_dir = tablet->data_dir(); |
374 | | |
375 | | // TODO(yiguolei) |
376 | | // the following code is very difficult to understand because it mixed alter tablet v2 |
377 | | // and alter tablet v1 should remove alter tablet v1 code after v0.12 |
378 | 300 | Status res = Status::OK(); |
379 | 300 | bool is_tablet_added = false; |
380 | 300 | do { |
381 | 300 | res = tablet->init(); |
382 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "TabletInit", parent_timer_name), |
383 | 300 | static_cast<int64_t>(watch.reset())); |
384 | 300 | if (!res.ok()) { |
385 | 0 | LOG(WARNING) << "tablet init failed. tablet:" << tablet->tablet_id(); |
386 | 0 | break; |
387 | 0 | } |
388 | | |
389 | | // Create init version if this is not a restore mode replica and request.version is set |
390 | | // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
391 | | // if (!in_restore_mode && request.__isset.version) { |
392 | | // create initial rowset before add it to storage engine could omit many locks |
393 | 300 | res = tablet->create_initial_rowset(request.version); |
394 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "InitRowset", parent_timer_name), |
395 | 300 | static_cast<int64_t>(watch.reset())); |
396 | 300 | if (!res.ok()) { |
397 | 0 | LOG(WARNING) << "fail to create initial version for tablet. res=" << res; |
398 | 0 | break; |
399 | 0 | } |
400 | | |
401 | 300 | if (is_schema_change) { |
402 | | // if this is a new alter tablet, has to set its state to not ready |
403 | | // because schema change handler depends on it to check whether history data |
404 | | // convert finished |
405 | 0 | static_cast<void>(tablet->set_tablet_state(TabletState::TABLET_NOTREADY)); |
406 | 0 | } |
407 | | // Add tablet to StorageEngine will make it visible to user |
408 | | // Will persist tablet meta |
409 | 300 | auto add_tablet_timer = ADD_CHILD_TIMER(profile, "AddTablet", parent_timer_name); |
410 | 300 | res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false, profile); |
411 | 300 | COUNTER_UPDATE(add_tablet_timer, static_cast<int64_t>(watch.reset())); |
412 | 300 | if (!res.ok()) { |
413 | 0 | LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; |
414 | 0 | break; |
415 | 0 | } |
416 | 300 | is_tablet_added = true; |
417 | | |
418 | | // TODO(lingbin): The following logic seems useless, can be removed? |
419 | | // Because if _add_tablet_unlocked() return OK, we must can get it from map. |
420 | 300 | TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id); |
421 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetTablet", parent_timer_name), |
422 | 300 | static_cast<int64_t>(watch.reset())); |
423 | 300 | if (tablet_ptr == nullptr) { |
424 | 0 | res = Status::Error<TABLE_NOT_FOUND>("fail to get tablet. res={}", res); |
425 | 0 | break; |
426 | 0 | } |
427 | 300 | } while (false); |
428 | | |
429 | 300 | if (res.ok()) { |
430 | 300 | return tablet; |
431 | 300 | } |
432 | | // something is wrong, we need clear environment |
433 | 0 | if (is_tablet_added) { |
434 | 0 | Status status = _drop_tablet(new_tablet_id, request.replica_id, false, false, true); |
435 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropTablet", parent_timer_name), |
436 | 0 | static_cast<int64_t>(watch.reset())); |
437 | 0 | if (!status.ok()) { |
438 | 0 | LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; |
439 | 0 | } |
440 | 0 | } else { |
441 | 0 | tablet->delete_all_files(); |
442 | 0 | static_cast<void>(TabletMetaManager::remove(data_dir, new_tablet_id, new_schema_hash)); |
443 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemoveTabletFiles", parent_timer_name), |
444 | 0 | static_cast<int64_t>(watch.reset())); |
445 | 0 | } |
446 | 0 | return nullptr; |
447 | 300 | } |
448 | | |
449 | 300 | static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t tablet_id) { |
450 | 300 | string path = dir; |
451 | 300 | path = path_util::join_path_segments(path, DATA_PREFIX); |
452 | 300 | path = path_util::join_path_segments(path, std::to_string(shard_id)); |
453 | 300 | path = path_util::join_path_segments(path, std::to_string(tablet_id)); |
454 | 300 | return path; |
455 | 300 | } |
456 | | |
457 | | TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( |
458 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
459 | 300 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
460 | 300 | string pending_id = StrCat(TABLET_ID_PREFIX, request.tablet_id); |
461 | | // Many attempts are made here in the hope that even if a disk fails, it can still continue. |
462 | 300 | std::string parent_timer_name = "CreateMeta"; |
463 | 300 | MonotonicStopWatch watch; |
464 | 300 | watch.start(); |
465 | 300 | for (auto& data_dir : data_dirs) { |
466 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemovePendingIds", parent_timer_name), |
467 | 300 | static_cast<int64_t>(watch.reset())); |
468 | | |
469 | 300 | TabletMetaSharedPtr tablet_meta; |
470 | | // if create meta failed, do not need to clean dir, because it is only in memory |
471 | 300 | Status res = _create_tablet_meta_unlocked(request, data_dir, is_schema_change, base_tablet, |
472 | 300 | &tablet_meta); |
473 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateMetaUnlock", parent_timer_name), |
474 | 300 | static_cast<int64_t>(watch.reset())); |
475 | 300 | if (!res.ok()) { |
476 | 0 | LOG(WARNING) << "fail to create tablet meta. res=" << res |
477 | 0 | << ", root=" << data_dir->path(); |
478 | 0 | continue; |
479 | 0 | } |
480 | | |
481 | 300 | string tablet_dir = |
482 | 300 | _gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(), request.tablet_id); |
483 | 300 | string schema_hash_dir = path_util::join_path_segments( |
484 | 300 | tablet_dir, std::to_string(request.tablet_schema.schema_hash)); |
485 | | |
486 | | // Because the tablet is removed asynchronously, so that the dir may still exist when BE |
487 | | // receive create-tablet request again, For example retried schema-change request |
488 | 300 | bool exists = true; |
489 | 300 | res = io::global_local_filesystem()->exists(schema_hash_dir, &exists); |
490 | 300 | if (!res.ok()) { |
491 | 0 | continue; |
492 | 0 | } |
493 | 300 | if (exists) { |
494 | 0 | LOG(WARNING) << "skip this dir because tablet path exist, path=" << schema_hash_dir; |
495 | 0 | continue; |
496 | 300 | } else { |
497 | 300 | Status st = io::global_local_filesystem()->create_directory(schema_hash_dir); |
498 | 300 | if (!st.ok()) { |
499 | 0 | continue; |
500 | 0 | } |
501 | 300 | } |
502 | | |
503 | 300 | if (tablet_meta->partition_id() <= 0) { |
504 | 233 | LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << ", tablet " |
505 | 233 | << tablet_meta->tablet_id(); |
506 | 233 | } |
507 | 300 | TabletSharedPtr new_tablet = |
508 | 300 | std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
509 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateTabletFromMeta", parent_timer_name), |
510 | 300 | static_cast<int64_t>(watch.reset())); |
511 | 300 | return new_tablet; |
512 | 300 | } |
513 | 0 | return nullptr; |
514 | 300 | } |
515 | | |
516 | | Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, |
517 | 252 | bool is_drop_table_or_partition) { |
518 | 252 | return _drop_tablet(tablet_id, replica_id, false, is_drop_table_or_partition, false); |
519 | 252 | } |
520 | | |
521 | | // Drop specified tablet. |
522 | | Status TabletManager::_drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files, |
523 | 256 | bool is_drop_table_or_partition, bool had_held_shard_lock) { |
524 | 256 | LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id |
525 | 256 | << ", is_drop_table_or_partition=" << is_drop_table_or_partition |
526 | 256 | << ", keep_files=" << keep_files; |
527 | 256 | DorisMetrics::instance()->drop_tablet_requests_total->increment(1); |
528 | | |
529 | 256 | RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet")); |
530 | 256 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }}; |
531 | | |
532 | | // Fetch tablet which need to be dropped |
533 | 256 | TabletSharedPtr to_drop_tablet; |
534 | 256 | { |
535 | 256 | std::unique_lock<std::shared_mutex> wlock(_get_tablets_shard_lock(tablet_id), |
536 | 256 | std::defer_lock); |
537 | 256 | if (!had_held_shard_lock) { |
538 | 252 | wlock.lock(); |
539 | 252 | } |
540 | 256 | to_drop_tablet = _get_tablet_unlocked(tablet_id); |
541 | 256 | if (to_drop_tablet == nullptr) { |
542 | 1 | LOG(WARNING) << "fail to drop tablet because it does not exist. " |
543 | 1 | << "tablet_id=" << tablet_id; |
544 | 1 | return Status::OK(); |
545 | 1 | } |
546 | | |
547 | | // We should compare replica id to avoid dropping new cloned tablet. |
548 | | // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. |
549 | 255 | if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { |
550 | 0 | return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), |
551 | 0 | replica_id); |
552 | 0 | } |
553 | | |
554 | 255 | _remove_tablet_from_partition(to_drop_tablet); |
555 | 255 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
556 | 255 | tablet_map.erase(tablet_id); |
557 | 255 | } |
558 | | |
559 | 0 | to_drop_tablet->clear_cache(); |
560 | | |
561 | 255 | { |
562 | | // drop tablet will update tablet meta, should lock |
563 | 255 | std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock()); |
564 | 255 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
565 | | // NOTE: has to update tablet here, but must not update tablet meta directly. |
566 | | // because other thread may hold the tablet object, they may save meta too. |
567 | | // If update meta directly here, other thread may override the meta |
568 | | // and the tablet will be loaded at restart time. |
569 | | // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. |
570 | | // |
571 | | // Until now, only the restore task uses keep files. |
572 | 255 | RETURN_IF_ERROR(to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN)); |
573 | 255 | if (!keep_files) { |
574 | 253 | LOG(INFO) << "set tablet to shutdown state and remove it from memory. " |
575 | 253 | << "tablet_id=" << tablet_id |
576 | 253 | << ", tablet_path=" << to_drop_tablet->tablet_path(); |
577 | | // We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta, |
578 | | // otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost. |
579 | 253 | if (is_drop_table_or_partition) { |
580 | 0 | RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets()); |
581 | 0 | } |
582 | 253 | to_drop_tablet->save_meta(); |
583 | 253 | { |
584 | 253 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
585 | 253 | _shutdown_tablets.push_back(to_drop_tablet); |
586 | 253 | } |
587 | 253 | } |
588 | 255 | } |
589 | | |
590 | 255 | to_drop_tablet->deregister_tablet_from_dir(); |
591 | 255 | g_tablet_meta_schema_columns_count << -to_drop_tablet->tablet_meta()->tablet_columns_num(); |
592 | 255 | return Status::OK(); |
593 | 255 | } |
594 | | |
595 | 2.69k | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) { |
596 | 2.69k | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
597 | 2.69k | return _get_tablet_unlocked(tablet_id, include_deleted, err); |
598 | 2.69k | } |
599 | | |
600 | 0 | std::vector<TabletSharedPtr> TabletManager::get_all_tablet(std::function<bool(Tablet*)>&& filter) { |
601 | 0 | std::vector<TabletSharedPtr> res; |
602 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { res.emplace_back(tablet); }, |
603 | 0 | std::move(filter)); |
604 | 0 | return res; |
605 | 0 | } |
606 | | |
607 | | void TabletManager::for_each_tablet(std::function<void(const TabletSharedPtr&)>&& handler, |
608 | 11 | std::function<bool(Tablet*)>&& filter) { |
609 | 11 | std::vector<TabletSharedPtr> tablets; |
610 | 11 | for (const auto& tablets_shard : _tablets_shards) { |
611 | 11 | tablets.clear(); |
612 | 11 | { |
613 | 11 | std::shared_lock rdlock(tablets_shard.lock); |
614 | 257 | for (const auto& [id, tablet] : tablets_shard.tablet_map) { |
615 | 257 | if (filter(tablet.get())) { |
616 | 257 | tablets.emplace_back(tablet); |
617 | 257 | } |
618 | 257 | } |
619 | 11 | } |
620 | 257 | for (const auto& tablet : tablets) { |
621 | 257 | handler(tablet); |
622 | 257 | } |
623 | 11 | } |
624 | 11 | } |
625 | | |
626 | | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, |
627 | 2.69k | string* err) { |
628 | 2.69k | TabletSharedPtr tablet; |
629 | 2.69k | tablet = _get_tablet_unlocked(tablet_id); |
630 | 2.69k | if (tablet == nullptr && include_deleted) { |
631 | 4 | std::shared_lock rdlock(_shutdown_tablets_lock); |
632 | 4 | for (auto& deleted_tablet : _shutdown_tablets) { |
633 | 2 | CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr"; |
634 | 2 | if (deleted_tablet->tablet_id() == tablet_id) { |
635 | 2 | tablet = deleted_tablet; |
636 | 2 | break; |
637 | 2 | } |
638 | 2 | } |
639 | 4 | } |
640 | | |
641 | 2.69k | if (tablet == nullptr) { |
642 | 239 | if (err != nullptr) { |
643 | 1 | *err = "tablet does not exist. " + BackendOptions::get_localhost(); |
644 | 1 | } |
645 | 239 | return nullptr; |
646 | 239 | } |
647 | | #ifndef BE_TEST |
648 | | if (!tablet->is_used()) { |
649 | | LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; |
650 | | if (err != nullptr) { |
651 | | *err = "tablet cannot be used. " + BackendOptions::get_localhost(); |
652 | | } |
653 | | return nullptr; |
654 | | } |
655 | | #endif |
656 | | |
657 | 2.45k | return tablet; |
658 | 2.69k | } |
659 | | |
660 | | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, TabletUid tablet_uid, |
661 | 0 | bool include_deleted, string* err) { |
662 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
663 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, include_deleted, err); |
664 | 0 | if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) { |
665 | 0 | return tablet; |
666 | 0 | } |
667 | 0 | return nullptr; |
668 | 0 | } |
669 | | |
670 | 0 | uint64_t TabletManager::get_rowset_nums() { |
671 | 0 | uint64_t rowset_nums = 0; |
672 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { rowset_nums += tablet->version_count(); }, |
673 | 0 | filter_all_tablets); |
674 | 0 | return rowset_nums; |
675 | 0 | } |
676 | | |
677 | 0 | uint64_t TabletManager::get_segment_nums() { |
678 | 0 | uint64_t segment_nums = 0; |
679 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { segment_nums += tablet->segment_count(); }, |
680 | 0 | filter_all_tablets); |
681 | 0 | return segment_nums; |
682 | 0 | } |
683 | | |
684 | | bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, |
685 | | TTabletId* tablet_id, |
686 | 37 | TSchemaHash* schema_hash) { |
687 | | // the path like: /data/14/10080/964828783/ |
688 | 37 | static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); |
689 | | // match tablet schema hash data path, for example, the path is /data/1/16791/29998 |
690 | | // 1 is shard id , 16791 is tablet id, 29998 is schema hash |
691 | 37 | if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) { |
692 | 33 | return true; |
693 | 33 | } |
694 | | |
695 | | // If we can't match normal path pattern, this may be a path which is a empty tablet |
696 | | // directory. Use this pattern to match empty tablet directory. In this case schema_hash |
697 | | // will be set to zero. |
698 | 4 | static re2::RE2 empty_tablet_re("/data/\\d+/(\\d+)($|/$)"); |
699 | 4 | if (!RE2::PartialMatch(path, empty_tablet_re, tablet_id)) { |
700 | 2 | return false; |
701 | 2 | } |
702 | 2 | *schema_hash = 0; |
703 | 2 | return true; |
704 | 4 | } |
705 | | |
706 | 3 | bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { |
707 | | // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat |
708 | 3 | static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); |
709 | 3 | string id_str; |
710 | 3 | bool ret = RE2::PartialMatch(path, re, &id_str); |
711 | 3 | if (ret) { |
712 | 1 | rowset_id->init(id_str); |
713 | 1 | return true; |
714 | 1 | } |
715 | 2 | return false; |
716 | 3 | } |
717 | | |
718 | 0 | void TabletManager::get_tablet_stat(TTabletStatResult* result) { |
719 | 0 | std::shared_ptr<std::vector<TTabletStat>> local_cache; |
720 | 0 | { |
721 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
722 | 0 | local_cache = _tablet_stat_list_cache; |
723 | 0 | } |
724 | 0 | result->__set_tablet_stat_list(*local_cache); |
725 | 0 | } |
726 | | |
727 | | struct TabletScore { |
728 | | TabletSharedPtr tablet_ptr; |
729 | | int score; |
730 | | }; |
731 | | |
732 | | std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( |
733 | | CompactionType compaction_type, DataDir* data_dir, |
734 | | const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, |
735 | | const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& |
736 | 6 | all_cumulative_compaction_policies) { |
737 | 6 | int64_t now_ms = UnixMillis(); |
738 | 6 | const string& compaction_type_str = |
739 | 6 | compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; |
740 | 6 | uint32_t highest_score = 0; |
741 | | // find the single compaction tablet |
742 | 6 | uint32_t single_compact_highest_score = 0; |
743 | 6 | TabletSharedPtr best_tablet; |
744 | 6 | TabletSharedPtr best_single_compact_tablet; |
745 | 92 | auto cmp = [](TabletScore left, TabletScore right) { return left.score > right.score; }; |
746 | 6 | std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> top_tablets(cmp); |
747 | | |
748 | 257 | auto handler = [&](const TabletSharedPtr& tablet_ptr) { |
749 | 257 | if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
750 | 0 | LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id() |
751 | 0 | << " will be ignored by automatic compaction tasks since it's " |
752 | 0 | << "set to disabled automatic compaction."; |
753 | 0 | return; |
754 | 0 | } |
755 | | |
756 | 257 | if (config::enable_skip_tablet_compaction && |
757 | 257 | tablet_ptr->should_skip_compaction(compaction_type, UnixSeconds())) { |
758 | 0 | return; |
759 | 0 | } |
760 | 257 | if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
761 | 0 | return; |
762 | 0 | } |
763 | | |
764 | 257 | auto search = tablet_submitted_compaction.find(tablet_ptr); |
765 | 257 | if (search != tablet_submitted_compaction.end()) { |
766 | 0 | return; |
767 | 0 | } |
768 | | |
769 | 257 | int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); |
770 | 257 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
771 | 0 | last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); |
772 | 0 | } |
773 | 257 | if (now_ms - last_failure_ms <= config::tablet_sched_delay_time_ms) { |
774 | 0 | VLOG_DEBUG << "Too often to check compaction, skip it. " |
775 | 0 | << "compaction_type=" << compaction_type_str |
776 | 0 | << ", last_failure_time_ms=" << last_failure_ms |
777 | 0 | << ", tablet_id=" << tablet_ptr->tablet_id(); |
778 | 0 | return; |
779 | 0 | } |
780 | | |
781 | 257 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
782 | 0 | std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(), |
783 | 0 | std::try_to_lock); |
784 | 0 | if (!lock.owns_lock()) { |
785 | 0 | LOG(INFO) << "can not get base lock: " << tablet_ptr->tablet_id(); |
786 | 0 | return; |
787 | 0 | } |
788 | 257 | } else { |
789 | 257 | std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(), |
790 | 257 | std::try_to_lock); |
791 | 257 | if (!lock.owns_lock()) { |
792 | 0 | LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id(); |
793 | 0 | return; |
794 | 0 | } |
795 | 257 | } |
796 | 257 | auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( |
797 | 257 | tablet_ptr->tablet_meta()->compaction_policy()); |
798 | 257 | uint32_t current_compaction_score = tablet_ptr->calc_compaction_score(); |
799 | 257 | if (current_compaction_score < 5) { |
800 | 8 | tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); |
801 | 8 | } |
802 | | |
803 | | // tablet should do single compaction |
804 | 257 | if (current_compaction_score > single_compact_highest_score && |
805 | 257 | tablet_ptr->should_fetch_from_peer()) { |
806 | 3 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
807 | 3 | cumulative_compaction_policy); |
808 | 3 | if (ret) { |
809 | 3 | single_compact_highest_score = current_compaction_score; |
810 | 3 | best_single_compact_tablet = tablet_ptr; |
811 | 3 | } |
812 | 3 | } |
813 | | |
814 | 257 | if (config::compaction_num_per_round > 1 && !tablet_ptr->should_fetch_from_peer()) { |
815 | 205 | TabletScore ts; |
816 | 205 | ts.score = current_compaction_score; |
817 | 205 | ts.tablet_ptr = tablet_ptr; |
818 | 205 | if ((top_tablets.size() >= config::compaction_num_per_round && |
819 | 205 | current_compaction_score > top_tablets.top().score) || |
820 | 205 | top_tablets.size() < config::compaction_num_per_round) { |
821 | 25 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
822 | 25 | cumulative_compaction_policy); |
823 | 25 | if (ret) { |
824 | 25 | top_tablets.push(ts); |
825 | 25 | if (top_tablets.size() > config::compaction_num_per_round) { |
826 | 0 | top_tablets.pop(); |
827 | 0 | } |
828 | 25 | if (current_compaction_score > highest_score) { |
829 | 3 | highest_score = current_compaction_score; |
830 | 3 | } |
831 | 25 | } |
832 | 25 | } |
833 | 205 | } else { |
834 | 52 | if (current_compaction_score > highest_score && !tablet_ptr->should_fetch_from_peer()) { |
835 | 3 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
836 | 3 | cumulative_compaction_policy); |
837 | 3 | if (ret) { |
838 | 3 | highest_score = current_compaction_score; |
839 | 3 | best_tablet = tablet_ptr; |
840 | 3 | } |
841 | 3 | } |
842 | 52 | } |
843 | 257 | }; |
844 | | |
845 | 6 | for_each_tablet(handler, filter_all_tablets); |
846 | 6 | std::vector<TabletSharedPtr> picked_tablet; |
847 | 6 | if (best_tablet != nullptr) { |
848 | 3 | VLOG_CRITICAL << "Found the best tablet for compaction. " |
849 | 0 | << "compaction_type=" << compaction_type_str |
850 | 0 | << ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path() |
851 | 0 | << ", highest_score=" << highest_score |
852 | 0 | << ", fetch from peer: " << best_tablet->should_fetch_from_peer(); |
853 | 3 | picked_tablet.emplace_back(std::move(best_tablet)); |
854 | 3 | } |
855 | | |
856 | 6 | std::vector<TabletSharedPtr> reverse_top_tablets; |
857 | 31 | while (!top_tablets.empty()) { |
858 | 25 | reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr); |
859 | 25 | top_tablets.pop(); |
860 | 25 | } |
861 | | |
862 | 31 | for (auto it = reverse_top_tablets.rbegin(); it != reverse_top_tablets.rend(); ++it) { |
863 | 25 | picked_tablet.emplace_back(*it); |
864 | 25 | } |
865 | | |
866 | | // pick single compaction tablet needs the highest score |
867 | 6 | if (best_single_compact_tablet != nullptr && single_compact_highest_score >= highest_score) { |
868 | 2 | VLOG_CRITICAL << "Found the best tablet for single compaction. " |
869 | 0 | << "compaction_type=" << compaction_type_str |
870 | 0 | << ", tablet_id=" << best_single_compact_tablet->tablet_id() |
871 | 0 | << ", path=" << data_dir->path() |
872 | 0 | << ", highest_score=" << single_compact_highest_score << ", fetch from peer: " |
873 | 0 | << best_single_compact_tablet->should_fetch_from_peer(); |
874 | 2 | picked_tablet.emplace_back(std::move(best_single_compact_tablet)); |
875 | 2 | } |
876 | 6 | *score = highest_score > single_compact_highest_score ? highest_score |
877 | 6 | : single_compact_highest_score; |
878 | 6 | return picked_tablet; |
879 | 6 | } |
880 | | |
881 | | Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, |
882 | | TSchemaHash schema_hash, std::string_view meta_binary, |
883 | | bool update_meta, bool force, bool restore, |
884 | 4 | bool check_path) { |
885 | 4 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
886 | 4 | Status status = tablet_meta->deserialize(meta_binary); |
887 | 4 | if (!status.ok()) { |
888 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
889 | 0 | "fail to load tablet because can not parse meta_binary string. tablet_id={}, " |
890 | 0 | "schema_hash={}, path={}, status={}", |
891 | 0 | tablet_id, schema_hash, data_dir->path(), status); |
892 | 0 | } |
893 | | |
894 | | // check if tablet meta is valid |
895 | 4 | if (tablet_meta->tablet_id() != tablet_id || tablet_meta->schema_hash() != schema_hash) { |
896 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
897 | 0 | "fail to load tablet because meet invalid tablet meta. trying to load " |
898 | 0 | "tablet(tablet_id={}, schema_hash={}), but meet tablet={}, path={}", |
899 | 0 | tablet_id, schema_hash, tablet_meta->tablet_id(), data_dir->path()); |
900 | 0 | } |
901 | 4 | if (tablet_meta->tablet_uid().hi == 0 && tablet_meta->tablet_uid().lo == 0) { |
902 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
903 | 0 | "fail to load tablet because its uid == 0. tablet={}, path={}", |
904 | 0 | tablet_meta->tablet_id(), data_dir->path()); |
905 | 0 | } |
906 | | |
907 | 4 | if (restore) { |
908 | | // we're restoring tablet from trash, tablet state should be changed from shutdown back to running |
909 | 0 | tablet_meta->set_tablet_state(TABLET_RUNNING); |
910 | 0 | } |
911 | | |
912 | 4 | if (tablet_meta->partition_id() == 0) { |
913 | 1 | LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0"; |
914 | 1 | } |
915 | | |
916 | 4 | TabletSharedPtr tablet = std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
917 | | |
918 | | // NOTE: method load_tablet_from_meta could be called by two cases as below |
919 | | // case 1: BE start; |
920 | | // case 2: Clone Task/Restore |
921 | | // For case 1 doesn't need path check because BE is just starting and not ready, |
922 | | // just check tablet meta status to judge whether tablet is delete is enough. |
923 | | // For case 2, If a tablet has just been copied to local BE, |
924 | | // it may be cleared by gc-thread(see perform_tablet_gc) because the tablet meta may not be loaded to memory. |
925 | | // So clone task should check path and then failed and retry in this case. |
926 | 4 | if (check_path) { |
927 | 4 | bool exists = true; |
928 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(tablet->tablet_path(), &exists)); |
929 | 4 | if (!exists) { |
930 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
931 | 0 | "tablet path not exists, create tablet failed, path={}", tablet->tablet_path()); |
932 | 0 | } |
933 | 4 | } |
934 | | |
935 | 4 | if (tablet->tablet_meta()->tablet_state() == TABLET_SHUTDOWN) { |
936 | 0 | { |
937 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
938 | 0 | _shutdown_tablets.push_back(tablet); |
939 | 0 | } |
940 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
941 | 0 | "fail to load tablet because it is to be deleted. tablet_id={}, schema_hash={}, " |
942 | 0 | "path={}", |
943 | 0 | tablet_id, schema_hash, data_dir->path()); |
944 | 0 | } |
945 | | // NOTE: We do not check tablet's initial version here, because if BE restarts when |
946 | | // one tablet is doing schema-change, we may meet empty tablet. |
947 | 4 | if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { |
948 | | // tablet state is invalid, drop tablet |
949 | 0 | return Status::Error<TABLE_INDEX_VALIDATE_ERROR>( |
950 | 0 | "fail to load tablet. it is in running state but without delta. tablet={}, path={}", |
951 | 0 | tablet->tablet_id(), data_dir->path()); |
952 | 0 | } |
953 | | |
954 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
955 | 4 | tablet->init(), |
956 | 4 | strings::Substitute("tablet init failed. tablet=$0", tablet->tablet_id())); |
957 | | |
958 | 4 | RuntimeProfile profile("CreateTablet"); |
959 | 4 | std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id)); |
960 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
961 | 4 | _add_tablet_unlocked(tablet_id, tablet, update_meta, force, &profile), |
962 | 4 | strings::Substitute("fail to add tablet. tablet=$0", tablet->tablet_id())); |
963 | | |
964 | 4 | return Status::OK(); |
965 | 4 | } |
966 | | |
967 | | Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, |
968 | | SchemaHash schema_hash, const string& schema_hash_path, |
969 | 3 | bool force, bool restore) { |
970 | 3 | LOG(INFO) << "begin to load tablet from dir. " |
971 | 3 | << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash |
972 | 3 | << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; |
973 | | // not add lock here, because load_tablet_from_meta already add lock |
974 | 3 | std::string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id); |
975 | | // should change shard id before load tablet |
976 | 3 | std::string shard_path = |
977 | 3 | path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path))); |
978 | 3 | std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1); |
979 | 3 | int32_t shard = stol(shard_str); |
980 | | |
981 | 3 | bool exists = false; |
982 | 3 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(header_path, &exists)); |
983 | 3 | if (!exists) { |
984 | 0 | return Status::Error<NOT_FOUND>("fail to find header file. [header_path={}]", header_path); |
985 | 0 | } |
986 | | |
987 | 3 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
988 | 3 | if (!tablet_meta->create_from_file(header_path).ok()) { |
989 | 0 | return Status::Error<ENGINE_LOAD_INDEX_TABLE_ERROR>( |
990 | 0 | "fail to load tablet_meta. file_path={}", header_path); |
991 | 0 | } |
992 | 3 | TabletUid tablet_uid = TabletUid::gen_uid(); |
993 | | |
994 | | // remove rowset binlog metas |
995 | 3 | auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", schema_hash_path); |
996 | 3 | bool binlog_metas_file_exists = false; |
997 | 3 | auto file_exists_status = |
998 | 3 | io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); |
999 | 3 | if (!file_exists_status.ok()) { |
1000 | 0 | return file_exists_status; |
1001 | 0 | } |
1002 | 3 | bool contain_binlog = false; |
1003 | 3 | RowsetBinlogMetasPB rowset_binlog_metas_pb; |
1004 | 3 | if (binlog_metas_file_exists) { |
1005 | 0 | auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); |
1006 | 0 | if (binlog_meta_filesize > 0) { |
1007 | 0 | contain_binlog = true; |
1008 | 0 | RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); |
1009 | 0 | VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; |
1010 | 0 | } |
1011 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); |
1012 | 0 | } |
1013 | 3 | if (contain_binlog) { |
1014 | 0 | auto binlog_dir = fmt::format("{}/_binlog", schema_hash_path); |
1015 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); |
1016 | | |
1017 | 0 | std::vector<io::FileInfo> files; |
1018 | 0 | RETURN_IF_ERROR( |
1019 | 0 | io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); |
1020 | 0 | for (auto& file : files) { |
1021 | 0 | auto& filename = file.file_name; |
1022 | 0 | std::string new_suffix; |
1023 | 0 | std::string old_suffix; |
1024 | |
|
1025 | 0 | if (filename.ends_with(".binlog")) { |
1026 | 0 | old_suffix = ".binlog"; |
1027 | 0 | new_suffix = ".dat"; |
1028 | 0 | } else if (filename.ends_with(".binlog-index")) { |
1029 | 0 | old_suffix = ".binlog-index"; |
1030 | 0 | new_suffix = ".idx"; |
1031 | 0 | } else { |
1032 | 0 | continue; |
1033 | 0 | } |
1034 | | |
1035 | 0 | std::string new_filename = filename; |
1036 | 0 | new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(), |
1037 | 0 | new_suffix); |
1038 | 0 | auto from = fmt::format("{}/{}", schema_hash_path, filename); |
1039 | 0 | auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); |
1040 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); |
1041 | 0 | } |
1042 | | |
1043 | 0 | auto* meta = store->get_meta(); |
1044 | | // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas |
1045 | 0 | RETURN_IF_ERROR( |
1046 | 0 | RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); |
1047 | 0 | } |
1048 | | |
1049 | | // has to change shard id here, because meta file maybe copied from other source |
1050 | | // its shard is different from local shard |
1051 | 3 | tablet_meta->set_shard_id(shard); |
1052 | | // load dir is called by clone, restore, storage migration |
1053 | | // should change tablet uid when tablet object changed |
1054 | 3 | tablet_meta->set_tablet_uid(std::move(tablet_uid)); |
1055 | 3 | std::string meta_binary; |
1056 | 3 | tablet_meta->serialize(&meta_binary); |
1057 | 3 | RETURN_NOT_OK_STATUS_WITH_WARN( |
1058 | 3 | load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force, restore, |
1059 | 3 | true), |
1060 | 3 | strings::Substitute("fail to load tablet. header_path=$0", header_path)); |
1061 | | |
1062 | 3 | return Status::OK(); |
1063 | 3 | } |
1064 | | |
1065 | 0 | Status TabletManager::report_tablet_info(TTabletInfo* tablet_info) { |
1066 | 0 | LOG(INFO) << "begin to process report tablet info." |
1067 | 0 | << "tablet_id=" << tablet_info->tablet_id; |
1068 | |
|
1069 | 0 | Status res = Status::OK(); |
1070 | |
|
1071 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id); |
1072 | 0 | if (tablet == nullptr) { |
1073 | 0 | return Status::Error<TABLE_NOT_FOUND>("can't find tablet={}", tablet_info->tablet_id); |
1074 | 0 | } |
1075 | | |
1076 | 0 | tablet->build_tablet_report_info(tablet_info); |
1077 | 0 | VLOG_TRACE << "success to process report tablet info."; |
1078 | 0 | return res; |
1079 | 0 | } |
1080 | | |
1081 | 0 | void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { |
1082 | 0 | DCHECK(tablets_info != nullptr); |
1083 | 0 | VLOG_NOTICE << "begin to build all report tablets info"; |
1084 | | |
1085 | | // build the expired txn map first, outside the tablet map lock |
1086 | 0 | std::map<TabletInfo, std::vector<int64_t>> expire_txn_map; |
1087 | 0 | _engine.txn_manager()->build_expire_txn_map(&expire_txn_map); |
1088 | 0 | LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; |
1089 | |
|
1090 | 0 | HistogramStat tablet_version_num_hist; |
1091 | 0 | auto local_cache = std::make_shared<std::vector<TTabletStat>>(); |
1092 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1093 | 0 | auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; |
1094 | 0 | TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back(); |
1095 | 0 | tablet->build_tablet_report_info(&tablet_info, true, true); |
1096 | | // find expired transaction corresponding to this tablet |
1097 | 0 | TabletInfo tinfo(tablet->tablet_id(), tablet->tablet_uid()); |
1098 | 0 | auto find = expire_txn_map.find(tinfo); |
1099 | 0 | if (find != expire_txn_map.end()) { |
1100 | 0 | tablet_info.__set_transaction_ids(find->second); |
1101 | 0 | expire_txn_map.erase(find); |
1102 | 0 | } |
1103 | 0 | tablet_version_num_hist.add(tablet_info.total_version_count); |
1104 | 0 | auto& t_tablet_stat = local_cache->emplace_back(); |
1105 | 0 | t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); |
1106 | 0 | t_tablet_stat.__set_data_size(tablet_info.data_size); |
1107 | 0 | t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); |
1108 | 0 | t_tablet_stat.__set_row_count(tablet_info.row_count); |
1109 | 0 | t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); |
1110 | 0 | t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); |
1111 | 0 | t_tablet_stat.__set_visible_version(tablet_info.version); |
1112 | 0 | t_tablet_stat.__set_local_index_size(tablet_info.local_index_size); |
1113 | 0 | t_tablet_stat.__set_local_segment_size(tablet_info.local_segment_size); |
1114 | 0 | t_tablet_stat.__set_remote_index_size(tablet_info.remote_index_size); |
1115 | 0 | t_tablet_stat.__set_remote_segment_size(tablet_info.remote_segment_size); |
1116 | 0 | }; |
1117 | 0 | for_each_tablet(handler, filter_all_tablets); |
1118 | |
|
1119 | 0 | { |
1120 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
1121 | 0 | _tablet_stat_list_cache.swap(local_cache); |
1122 | 0 | } |
1123 | 0 | DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( |
1124 | 0 | tablet_version_num_hist); |
1125 | 0 | LOG(INFO) << "success to build all report tablets info. tablet_count=" << tablets_info->size(); |
1126 | 0 | } |
1127 | | |
1128 | 5 | Status TabletManager::start_trash_sweep() { |
1129 | 5 | DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK); |
1130 | 5 | std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock); |
1131 | 5 | if (!lock.try_lock()) { |
1132 | 0 | return Status::OK(); |
1133 | 0 | } |
1134 | | |
1135 | 5 | for_each_tablet([](const TabletSharedPtr& tablet) { tablet->delete_expired_stale_rowset(); }, |
1136 | 5 | filter_all_tablets); |
1137 | | |
1138 | 5 | std::list<TabletSharedPtr>::iterator last_it; |
1139 | 5 | { |
1140 | 5 | std::shared_lock rdlock(_shutdown_tablets_lock); |
1141 | 5 | last_it = _shutdown_tablets.begin(); |
1142 | 5 | if (last_it == _shutdown_tablets.end()) { |
1143 | 0 | return Status::OK(); |
1144 | 0 | } |
1145 | 5 | } |
1146 | | |
1147 | 10 | auto get_batch_tablets = [this, &last_it](int limit) { |
1148 | 10 | std::vector<TabletSharedPtr> batch_tablets; |
1149 | 10 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
1150 | 241 | while (last_it != _shutdown_tablets.end() && batch_tablets.size() < limit) { |
1151 | | // it means current tablet is referenced by other thread |
1152 | 231 | if (last_it->use_count() > 1) { |
1153 | 6 | last_it++; |
1154 | 225 | } else { |
1155 | 225 | batch_tablets.push_back(*last_it); |
1156 | 225 | last_it = _shutdown_tablets.erase(last_it); |
1157 | 225 | } |
1158 | 231 | } |
1159 | | |
1160 | 10 | return batch_tablets; |
1161 | 10 | }; |
1162 | | |
1163 | 5 | std::list<TabletSharedPtr> failed_tablets; |
1164 | | // return true if need continue delete |
1165 | 6 | auto delete_one_batch = [this, get_batch_tablets, &failed_tablets]() -> bool { |
1166 | 6 | int limit = 200; |
1167 | 10 | for (;;) { |
1168 | 10 | auto batch_tablets = get_batch_tablets(limit); |
1169 | 225 | for (const auto& tablet : batch_tablets) { |
1170 | 225 | if (_move_tablet_to_trash(tablet)) { |
1171 | 225 | limit--; |
1172 | 225 | } else { |
1173 | 0 | failed_tablets.push_back(tablet); |
1174 | 0 | } |
1175 | 225 | } |
1176 | 10 | if (limit <= 0) { |
1177 | 1 | return true; |
1178 | 1 | } |
1179 | 9 | if (batch_tablets.empty()) { |
1180 | 5 | return false; |
1181 | 5 | } |
1182 | 9 | } |
1183 | | |
1184 | 0 | return false; |
1185 | 6 | }; |
1186 | | |
1187 | 6 | while (delete_one_batch()) { |
1188 | | #ifndef BE_TEST |
1189 | | sleep(1); |
1190 | | #endif |
1191 | 1 | } |
1192 | | |
1193 | 5 | if (!failed_tablets.empty()) { |
1194 | 0 | std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock); |
1195 | 0 | _shutdown_tablets.splice(_shutdown_tablets.end(), failed_tablets); |
1196 | 0 | } |
1197 | | |
1198 | 5 | return Status::OK(); |
1199 | 5 | } |
1200 | | |
1201 | 225 | bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) { |
1202 | 225 | RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash")); |
1203 | 225 | Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }}; |
1204 | | |
1205 | 225 | TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id()); |
1206 | 225 | if (tablet_in_not_shutdown) { |
1207 | 0 | TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash(); |
1208 | 0 | size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash(); |
1209 | 0 | if (tablet->schema_hash() == schema_hash_not_shutdown && |
1210 | 0 | tablet->data_dir()->path_hash() == path_hash_not_shutdown) { |
1211 | 0 | tablet->clear_cache(); |
1212 | | // shard_id in memory not eq shard_id in shutdown |
1213 | 0 | if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) { |
1214 | 0 | LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id=" |
1215 | 0 | << tablet_in_not_shutdown->tablet_id() |
1216 | 0 | << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1217 | 0 | << ", shutdown tablet path=" << tablet->tablet_path(); |
1218 | 0 | return tablet->data_dir()->move_to_trash(tablet->tablet_path()); |
1219 | 0 | } else { |
1220 | 0 | LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id=" |
1221 | 0 | << tablet_in_not_shutdown->tablet_id() |
1222 | 0 | << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1223 | 0 | << ", shutdown tablet path=" << tablet->tablet_path(); |
1224 | 0 | return true; |
1225 | 0 | } |
1226 | 0 | } |
1227 | 0 | } |
1228 | | |
1229 | 225 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1230 | 225 | int64_t get_meta_ts = MonotonicMicros(); |
1231 | 225 | Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(), |
1232 | 225 | tablet->schema_hash(), tablet_meta); |
1233 | 225 | if (check_st.ok()) { |
1234 | 225 | if (tablet_meta->tablet_state() != TABLET_SHUTDOWN || |
1235 | 225 | tablet_meta->tablet_uid() != tablet->tablet_uid()) { |
1236 | 0 | LOG(WARNING) << "tablet's state changed to normal, skip remove dirs" |
1237 | 0 | << " tablet id = " << tablet_meta->tablet_id() |
1238 | 0 | << " schema hash = " << tablet_meta->schema_hash() |
1239 | 0 | << " old tablet_uid=" << tablet->tablet_uid() |
1240 | 0 | << " cur tablet_uid=" << tablet_meta->tablet_uid(); |
1241 | 0 | return true; |
1242 | 0 | } |
1243 | | |
1244 | 225 | tablet->clear_cache(); |
1245 | | |
1246 | | // move data to trash |
1247 | 225 | const auto& tablet_path = tablet->tablet_path(); |
1248 | 225 | bool exists = false; |
1249 | 225 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1250 | 225 | if (!exists_st) { |
1251 | 0 | return false; |
1252 | 0 | } |
1253 | 225 | if (exists) { |
1254 | | // take snapshot of tablet meta |
1255 | 225 | auto meta_file_path = fmt::format("{}/{}.hdr", tablet_path, tablet->tablet_id()); |
1256 | 225 | int64_t save_meta_ts = MonotonicMicros(); |
1257 | 225 | auto save_st = tablet->tablet_meta()->save(meta_file_path); |
1258 | 225 | if (!save_st.ok()) { |
1259 | 0 | LOG(WARNING) << "failed to save meta, tablet_id=" << tablet_meta->tablet_id() |
1260 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() |
1261 | 0 | << ", error=" << save_st; |
1262 | 0 | return false; |
1263 | 0 | } |
1264 | 225 | int64_t now = MonotonicMicros(); |
1265 | 225 | LOG(INFO) << "start to move tablet to trash. " << tablet_path |
1266 | 225 | << ". rocksdb get meta cost " << (save_meta_ts - get_meta_ts) |
1267 | 225 | << " us, rocksdb save meta cost " << (now - save_meta_ts) << " us"; |
1268 | 225 | Status rm_st = tablet->data_dir()->move_to_trash(tablet_path); |
1269 | 225 | if (!rm_st.ok()) { |
1270 | 0 | LOG(WARNING) << "fail to move dir to trash. " << tablet_path; |
1271 | 0 | return false; |
1272 | 0 | } |
1273 | 225 | } |
1274 | | // remove tablet meta |
1275 | 225 | auto remove_st = TabletMetaManager::remove(tablet->data_dir(), tablet->tablet_id(), |
1276 | 225 | tablet->schema_hash()); |
1277 | 225 | if (!remove_st.ok()) { |
1278 | 0 | LOG(WARNING) << "failed to remove meta, tablet_id=" << tablet_meta->tablet_id() |
1279 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() << ", error=" << remove_st; |
1280 | 0 | return false; |
1281 | 0 | } |
1282 | 225 | LOG(INFO) << "successfully move tablet to trash. " |
1283 | 225 | << "tablet_id=" << tablet->tablet_id() |
1284 | 225 | << ", schema_hash=" << tablet->schema_hash() << ", tablet_path=" << tablet_path; |
1285 | 225 | return true; |
1286 | 225 | } else { |
1287 | 0 | tablet->clear_cache(); |
1288 | | // if could not find tablet info in meta store, then check if dir existed |
1289 | 0 | const auto& tablet_path = tablet->tablet_path(); |
1290 | 0 | bool exists = false; |
1291 | 0 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1292 | 0 | if (!exists_st) { |
1293 | 0 | return false; |
1294 | 0 | } |
1295 | 0 | if (exists) { |
1296 | 0 | if (check_st.is<META_KEY_NOT_FOUND>()) { |
1297 | 0 | LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path " |
1298 | 0 | << "tablet_id=" << tablet->tablet_id() |
1299 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1300 | 0 | << ", delete tablet_path=" << tablet_path; |
1301 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path)); |
1302 | 0 | RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path)); |
1303 | 0 | return true; |
1304 | 0 | } |
1305 | 0 | LOG(WARNING) << "errors while load meta from store, skip this tablet. " |
1306 | 0 | << "tablet_id=" << tablet->tablet_id() |
1307 | 0 | << ", schema_hash=" << tablet->schema_hash(); |
1308 | 0 | return false; |
1309 | 0 | } else { |
1310 | 0 | LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. " |
1311 | 0 | << "tablet_id=" << tablet->tablet_id() |
1312 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1313 | 0 | << ", tablet_path=" << tablet_path; |
1314 | 0 | return true; |
1315 | 0 | } |
1316 | 0 | } |
1317 | 225 | } |
1318 | | |
1319 | 493 | Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) { |
1320 | 493 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1321 | 493 | std::thread::id thread_id = std::this_thread::get_id(); |
1322 | 493 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1323 | 493 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1324 | 493 | search == shard.tablets_under_transition.end()) { |
1325 | | // not found |
1326 | 491 | shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1); |
1327 | 491 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1328 | 491 | << ", lock times=1, thread_id_in_map=" << thread_id; |
1329 | 491 | return Status::OK(); |
1330 | 491 | } else { |
1331 | | // found |
1332 | 2 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1333 | 2 | if (thread_id != thread_id_in_map) { |
1334 | | // other thread, failed |
1335 | 0 | LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r |
1336 | 0 | << ", thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason |
1337 | 0 | << ", thread_id=" << thread_id; |
1338 | 0 | return Status::InternalError<false>("{} failed try later, tablet_id={}", reason, |
1339 | 0 | tablet_id); |
1340 | 0 | } |
1341 | | // add lock times |
1342 | 2 | ++lock_times; |
1343 | 2 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1344 | 2 | << ", lock times=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
1345 | 2 | return Status::OK(); |
1346 | 2 | } |
1347 | 493 | } |
1348 | | |
1349 | 493 | void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) { |
1350 | 493 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1351 | 493 | std::thread::id thread_id = std::this_thread::get_id(); |
1352 | 493 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1353 | 493 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1354 | 493 | search == shard.tablets_under_transition.end()) { |
1355 | | // impossible, bug |
1356 | 0 | DCHECK(false) << "tablet " << tablet_id |
1357 | 0 | << " must be found, before unreg must have been reg"; |
1358 | 493 | } else { |
1359 | 493 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1360 | 493 | if (thread_id_in_map != thread_id) { |
1361 | | // impossible, bug |
1362 | 0 | DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread"; |
1363 | 0 | } |
1364 | | // sub lock times |
1365 | 493 | --lock_times; |
1366 | 493 | if (lock_times != 0) { |
1367 | 2 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1368 | 2 | << ", left=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
1369 | 491 | } else { |
1370 | 491 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1371 | 491 | << ", thread_id_in_map=" << thread_id_in_map; |
1372 | 491 | shard.tablets_under_transition.erase(tablet_id); |
1373 | 491 | } |
1374 | 493 | } |
1375 | 493 | } |
1376 | | |
1377 | | void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, |
1378 | | SchemaHash schema_hash, |
1379 | | const string& schema_hash_path, |
1380 | 10 | int16_t shard_id) { |
1381 | | // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks |
1382 | | // create tablet and load tablet task should check whether the dir exists |
1383 | 10 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1384 | 10 | std::shared_lock rdlock(shard.lock); |
1385 | | |
1386 | | // check if meta already exists |
1387 | 10 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1388 | 10 | Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta); |
1389 | 10 | if (check_st.ok() && tablet_meta->shard_id() == shard_id) { |
1390 | 0 | return; |
1391 | 0 | } |
1392 | | |
1393 | 10 | LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path; |
1394 | | |
1395 | 10 | bool succ = register_transition_tablet(tablet_id, "path gc"); |
1396 | 10 | if (!succ) { |
1397 | 0 | return; |
1398 | 0 | } |
1399 | 10 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }}; |
1400 | | |
1401 | 10 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1402 | 10 | if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) { |
1403 | 0 | LOG(INFO) << "tablet exists, skip delete the path " << schema_hash_path; |
1404 | 0 | return; |
1405 | 0 | } |
1406 | | |
1407 | | // TODO(ygl): may do other checks in the future |
1408 | 10 | bool exists = false; |
1409 | 10 | Status exists_st = io::global_local_filesystem()->exists(schema_hash_path, &exists); |
1410 | 10 | if (exists_st && exists) { |
1411 | 10 | LOG(INFO) << "start to move tablet to trash. tablet_path = " << schema_hash_path; |
1412 | 10 | Status rm_st = data_dir->move_to_trash(schema_hash_path); |
1413 | 10 | if (!rm_st.ok()) { |
1414 | 0 | LOG(WARNING) << "fail to move dir to trash. dir=" << schema_hash_path; |
1415 | 10 | } else { |
1416 | 10 | LOG(INFO) << "move path " << schema_hash_path << " to trash successfully"; |
1417 | 10 | } |
1418 | 10 | } |
1419 | 10 | } |
1420 | | |
1421 | | void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map, |
1422 | 0 | size_t* tablet_count) { |
1423 | 0 | DCHECK(tablet_count); |
1424 | 0 | *tablet_count = 0; |
1425 | 0 | auto filter = [path_map, tablet_count](Tablet* t) -> bool { |
1426 | 0 | ++(*tablet_count); |
1427 | 0 | auto iter = path_map->find(t->data_dir()->path()); |
1428 | 0 | return iter != path_map->end() && iter->second.is_used; |
1429 | 0 | }; |
1430 | |
|
1431 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1432 | 0 | auto& data_dir_info = (*path_map)[tablet->data_dir()->path()]; |
1433 | 0 | data_dir_info.local_used_capacity += tablet->tablet_local_size(); |
1434 | 0 | data_dir_info.remote_used_capacity += tablet->tablet_remote_size(); |
1435 | 0 | }; |
1436 | |
|
1437 | 0 | for_each_tablet(handler, filter); |
1438 | 0 | } |
1439 | | |
1440 | | void TabletManager::get_partition_related_tablets(int64_t partition_id, |
1441 | 0 | std::set<TabletInfo>* tablet_infos) { |
1442 | 0 | std::shared_lock rdlock(_partitions_lock); |
1443 | 0 | auto it = _partitions.find(partition_id); |
1444 | 0 | if (it != _partitions.end()) { |
1445 | 0 | *tablet_infos = it->second.tablets; |
1446 | 0 | } |
1447 | 0 | } |
1448 | | |
1449 | 0 | void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>* partitions_version) { |
1450 | 0 | std::shared_lock rdlock(_partitions_lock); |
1451 | 0 | for (const auto& [partition_id, partition] : _partitions) { |
1452 | 0 | partitions_version->insert( |
1453 | 0 | {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); |
1454 | 0 | } |
1455 | 0 | } |
1456 | | |
1457 | | void TabletManager::update_partitions_visible_version( |
1458 | 0 | const std::map<int64_t, int64_t>& partitions_version) { |
1459 | 0 | std::shared_lock rdlock(_partitions_lock); |
1460 | 0 | for (auto [partition_id, version] : partitions_version) { |
1461 | 0 | auto it = _partitions.find(partition_id); |
1462 | 0 | if (it != _partitions.end()) { |
1463 | 0 | it->second.visible_version->update_version_monoto(version); |
1464 | 0 | } |
1465 | 0 | } |
1466 | 0 | } |
1467 | | |
1468 | 0 | void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { |
1469 | 0 | auto filter = [data_dir](Tablet* tablet) -> bool { |
1470 | 0 | return tablet->tablet_state() == TABLET_RUNNING && |
1471 | 0 | tablet->data_dir()->path_hash() == data_dir->path_hash() && tablet->is_used() && |
1472 | 0 | tablet->init_succeeded(); |
1473 | 0 | }; |
1474 | |
|
1475 | 0 | std::vector<TabletSharedPtr> related_tablets = get_all_tablet(filter); |
1476 | 0 | int counter = 0; |
1477 | 0 | MonotonicStopWatch watch; |
1478 | 0 | watch.start(); |
1479 | 0 | for (TabletSharedPtr tablet : related_tablets) { |
1480 | 0 | if (tablet->do_tablet_meta_checkpoint()) { |
1481 | 0 | ++counter; |
1482 | 0 | } |
1483 | 0 | } |
1484 | 0 | int64_t cost = watch.elapsed_time() / 1000 / 1000; |
1485 | 0 | LOG(INFO) << "finish to do meta checkpoint on dir: " << data_dir->path() |
1486 | 0 | << ", number: " << counter << ", cost(ms): " << cost; |
1487 | 0 | } |
1488 | | |
1489 | | Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, |
1490 | | const bool is_schema_change, |
1491 | | const Tablet* base_tablet, |
1492 | 300 | TabletMetaSharedPtr* tablet_meta) { |
1493 | 300 | uint32_t next_unique_id = 0; |
1494 | 300 | std::unordered_map<uint32_t, uint32_t> col_idx_to_unique_id; |
1495 | 300 | if (!is_schema_change) { |
1496 | 2.15k | for (uint32_t col_idx = 0; col_idx < request.tablet_schema.columns.size(); ++col_idx) { |
1497 | 1.85k | col_idx_to_unique_id[col_idx] = col_idx; |
1498 | 1.85k | } |
1499 | 300 | next_unique_id = request.tablet_schema.columns.size(); |
1500 | 300 | } else { |
1501 | 0 | next_unique_id = base_tablet->next_unique_id(); |
1502 | 0 | auto& new_columns = request.tablet_schema.columns; |
1503 | 0 | for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) { |
1504 | 0 | const TColumn& column = new_columns[new_col_idx]; |
1505 | | // For schema change, compare old_tablet and new_tablet: |
1506 | | // 1. if column exist in both new_tablet and old_tablet, choose the column's |
1507 | | // unique_id in old_tablet to be the column's ordinal number in new_tablet |
1508 | | // 2. if column exists only in new_tablet, assign next_unique_id of old_tablet |
1509 | | // to the new column |
1510 | 0 | int32_t old_col_idx = base_tablet->tablet_schema()->field_index(column.column_name); |
1511 | 0 | if (old_col_idx != -1) { |
1512 | 0 | uint32_t old_unique_id = |
1513 | 0 | base_tablet->tablet_schema()->column(old_col_idx).unique_id(); |
1514 | 0 | col_idx_to_unique_id[new_col_idx] = old_unique_id; |
1515 | 0 | } else { |
1516 | | // Not exist in old tablet, it is a new added column |
1517 | 0 | col_idx_to_unique_id[new_col_idx] = next_unique_id++; |
1518 | 0 | } |
1519 | 0 | } |
1520 | 0 | } |
1521 | 300 | VLOG_NOTICE << "creating tablet meta. next_unique_id=" << next_unique_id; |
1522 | | |
1523 | | // We generate a new tablet_uid for this new tablet. |
1524 | 300 | uint64_t shard_id = store->get_shard(); |
1525 | 300 | *tablet_meta = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id, |
1526 | 300 | col_idx_to_unique_id); |
1527 | 300 | if (request.__isset.storage_format) { |
1528 | 52 | if (request.storage_format == TStorageFormat::DEFAULT) { |
1529 | 0 | (*tablet_meta)->set_preferred_rowset_type(_engine.default_rowset_type()); |
1530 | 52 | } else if (request.storage_format == TStorageFormat::V1) { |
1531 | 0 | (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); |
1532 | 52 | } else if (request.storage_format == TStorageFormat::V2) { |
1533 | 52 | (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); |
1534 | 52 | } else { |
1535 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("invalid TStorageFormat: {}", |
1536 | 0 | request.storage_format); |
1537 | 0 | } |
1538 | 52 | } |
1539 | 300 | return Status::OK(); |
1540 | 300 | } |
1541 | | |
1542 | 3.56k | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { |
1543 | 3.56k | VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id; |
1544 | 3.56k | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
1545 | 3.56k | const auto& iter = tablet_map.find(tablet_id); |
1546 | 3.56k | if (iter != tablet_map.end()) { |
1547 | 3.01k | return iter->second; |
1548 | 3.01k | } |
1549 | 552 | return nullptr; |
1550 | 3.56k | } |
1551 | | |
1552 | 304 | void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { |
1553 | 304 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1554 | 304 | auto& partition = _partitions[tablet->partition_id()]; |
1555 | 304 | partition.tablets.insert(tablet->get_tablet_info()); |
1556 | 304 | tablet->set_visible_version( |
1557 | 304 | std::static_pointer_cast<const VersionWithTime>(partition.visible_version)); |
1558 | 304 | } |
1559 | | |
1560 | 255 | void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { |
1561 | 255 | tablet->set_visible_version(nullptr); |
1562 | 255 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1563 | 255 | auto it = _partitions.find(tablet->partition_id()); |
1564 | 255 | if (it == _partitions.end()) { |
1565 | 0 | return; |
1566 | 0 | } |
1567 | | |
1568 | 255 | auto& tablets = it->second.tablets; |
1569 | 255 | tablets.erase(tablet->get_tablet_info()); |
1570 | 255 | if (tablets.empty()) { |
1571 | 32 | _partitions.erase(it); |
1572 | 32 | } |
1573 | 255 | } |
1574 | | |
1575 | | void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info, |
1576 | 0 | int64_t num) { |
1577 | 0 | for (const auto& tablets_shard : _tablets_shards) { |
1578 | 0 | std::shared_lock rdlock(tablets_shard.lock); |
1579 | 0 | for (const auto& item : tablets_shard.tablet_map) { |
1580 | 0 | TabletSharedPtr tablet = item.second; |
1581 | 0 | if (tablets_info.size() >= num) { |
1582 | 0 | return; |
1583 | 0 | } |
1584 | 0 | if (tablet == nullptr) { |
1585 | 0 | continue; |
1586 | 0 | } |
1587 | 0 | tablets_info.push_back(tablet->get_tablet_info()); |
1588 | 0 | } |
1589 | 0 | } |
1590 | 0 | } |
1591 | | |
1592 | 3.25k | std::shared_mutex& TabletManager::_get_tablets_shard_lock(TTabletId tabletId) { |
1593 | 3.25k | return _get_tablets_shard(tabletId).lock; |
1594 | 3.25k | } |
1595 | | |
1596 | 4.45k | TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) { |
1597 | 4.45k | return _get_tablets_shard(tabletId).tablet_map; |
1598 | 4.45k | } |
1599 | | |
1600 | 8.70k | TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tabletId) { |
1601 | 8.70k | return _tablets_shards[tabletId & _tablets_shards_mask]; |
1602 | 8.70k | } |
1603 | | |
1604 | | void TabletManager::get_tablets_distribution_on_different_disks( |
1605 | | std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk, |
1606 | 0 | std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk) { |
1607 | 0 | std::vector<DataDir*> data_dirs = _engine.get_stores(); |
1608 | 0 | std::map<int64_t, Partition> partitions; |
1609 | 0 | { |
1610 | | // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. |
1611 | | // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and |
1612 | | // copy _partitions here. |
1613 | 0 | std::shared_lock rdlock(_partitions_lock); |
1614 | 0 | partitions = _partitions; |
1615 | 0 | } |
1616 | |
|
1617 | 0 | for (const auto& [partition_id, partition] : partitions) { |
1618 | 0 | std::map<DataDir*, int64_t> tablets_num; |
1619 | 0 | std::map<DataDir*, std::vector<TabletSize>> tablets_info; |
1620 | 0 | for (auto* data_dir : data_dirs) { |
1621 | 0 | tablets_num[data_dir] = 0; |
1622 | 0 | } |
1623 | |
|
1624 | 0 | for (const auto& tablet_info : partition.tablets) { |
1625 | | // get_tablet() will hold 'tablet_shard_lock' |
1626 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); |
1627 | 0 | if (tablet == nullptr) { |
1628 | 0 | continue; |
1629 | 0 | } |
1630 | 0 | DataDir* data_dir = tablet->data_dir(); |
1631 | 0 | size_t tablet_footprint = tablet->tablet_footprint(); |
1632 | 0 | tablets_num[data_dir]++; |
1633 | 0 | TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint); |
1634 | 0 | tablets_info[data_dir].push_back(tablet_size); |
1635 | 0 | } |
1636 | 0 | tablets_num_on_disk[partition_id] = tablets_num; |
1637 | 0 | tablets_info_on_disk[partition_id] = tablets_info; |
1638 | 0 | } |
1639 | 0 | } |
1640 | | |
1641 | | struct SortCtx { |
1642 | | SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t cooldown_timestamp, |
1643 | | int64_t file_size) |
1644 | 0 | : tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {} |
1645 | | TabletSharedPtr tablet; |
1646 | | RowsetSharedPtr rowset; |
1647 | | // to ensure the tablet with -1 would always be greater than other |
1648 | | uint64_t cooldown_timestamp; |
1649 | | int64_t file_size; |
1650 | 0 | bool operator<(const SortCtx& other) const { |
1651 | 0 | if (this->cooldown_timestamp == other.cooldown_timestamp) { |
1652 | 0 | return this->file_size > other.file_size; |
1653 | 0 | } |
1654 | 0 | return this->cooldown_timestamp < other.cooldown_timestamp; |
1655 | 0 | } |
1656 | | }; |
1657 | | |
1658 | | void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets, |
1659 | | std::vector<RowsetSharedPtr>* rowsets, |
1660 | 0 | std::function<bool(const TabletSharedPtr&)> skip_tablet) { |
1661 | 0 | std::vector<SortCtx> sort_ctx_vec; |
1662 | 0 | std::vector<std::weak_ptr<Tablet>> candidates; |
1663 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { candidates.emplace_back(tablet); }, |
1664 | 0 | filter_all_tablets); |
1665 | 0 | auto get_cooldown_tablet = [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) { |
1666 | 0 | const TabletSharedPtr& tablet = t.lock(); |
1667 | 0 | RowsetSharedPtr rowset = nullptr; |
1668 | 0 | if (UNLIKELY(nullptr == tablet)) { |
1669 | 0 | return; |
1670 | 0 | } |
1671 | 0 | int64_t cooldown_timestamp = -1; |
1672 | 0 | size_t file_size = -1; |
1673 | 0 | if (!skip_tablet(tablet) && |
1674 | 0 | (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size))) { |
1675 | 0 | sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp, file_size); |
1676 | 0 | } |
1677 | 0 | }; |
1678 | 0 | std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet); |
1679 | |
|
1680 | 0 | std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end()); |
1681 | |
|
1682 | 0 | for (SortCtx& ctx : sort_ctx_vec) { |
1683 | 0 | VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id(); |
1684 | 0 | tablets->push_back(std::move(ctx.tablet)); |
1685 | 0 | rowsets->push_back(std::move(ctx.rowset)); |
1686 | 0 | } |
1687 | 0 | } |
1688 | | |
1689 | 0 | void TabletManager::get_all_tablets_storage_format(TCheckStorageFormatResult* result) { |
1690 | 0 | DCHECK(result != nullptr); |
1691 | 0 | auto handler = [result](const TabletSharedPtr& tablet) { |
1692 | 0 | if (tablet->all_beta()) { |
1693 | 0 | result->v2_tablets.push_back(tablet->tablet_id()); |
1694 | 0 | } else { |
1695 | 0 | result->v1_tablets.push_back(tablet->tablet_id()); |
1696 | 0 | } |
1697 | 0 | }; |
1698 | |
|
1699 | 0 | for_each_tablet(handler, filter_all_tablets); |
1700 | 0 | result->__isset.v1_tablets = true; |
1701 | 0 | result->__isset.v2_tablets = true; |
1702 | 0 | } |
1703 | | |
1704 | 0 | std::set<int64_t> TabletManager::check_all_tablet_segment(bool repair) { |
1705 | 0 | std::set<int64_t> bad_tablets; |
1706 | 0 | std::map<int64_t, std::vector<int64_t>> repair_shard_bad_tablets; |
1707 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1708 | 0 | if (!tablet->check_all_rowset_segment()) { |
1709 | 0 | int64_t tablet_id = tablet->tablet_id(); |
1710 | 0 | bad_tablets.insert(tablet_id); |
1711 | 0 | if (repair) { |
1712 | 0 | repair_shard_bad_tablets[tablet_id & _tablets_shards_mask].push_back(tablet_id); |
1713 | 0 | } |
1714 | 0 | } |
1715 | 0 | }; |
1716 | 0 | for_each_tablet(handler, filter_all_tablets); |
1717 | |
|
1718 | 0 | for (const auto& [shard_index, shard_tablets] : repair_shard_bad_tablets) { |
1719 | 0 | auto& tablets_shard = _tablets_shards[shard_index]; |
1720 | 0 | auto& tablet_map = tablets_shard.tablet_map; |
1721 | 0 | std::lock_guard<std::shared_mutex> wrlock(tablets_shard.lock); |
1722 | 0 | for (auto tablet_id : shard_tablets) { |
1723 | 0 | auto it = tablet_map.find(tablet_id); |
1724 | 0 | if (it == tablet_map.end()) { |
1725 | 0 | bad_tablets.erase(tablet_id); |
1726 | 0 | LOG(WARNING) << "Bad tablet has be removed. tablet_id=" << tablet_id; |
1727 | 0 | } else { |
1728 | 0 | const auto& tablet = it->second; |
1729 | 0 | static_cast<void>(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
1730 | 0 | tablet->save_meta(); |
1731 | 0 | { |
1732 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock( |
1733 | 0 | _shutdown_tablets_lock); |
1734 | 0 | _shutdown_tablets.push_back(tablet); |
1735 | 0 | } |
1736 | 0 | LOG(WARNING) << "There are some segments lost, set tablet to shutdown state." |
1737 | 0 | << "tablet_id=" << tablet->tablet_id() |
1738 | 0 | << ", tablet_path=" << tablet->tablet_path(); |
1739 | 0 | } |
1740 | 0 | } |
1741 | 0 | } |
1742 | |
|
1743 | 0 | return bad_tablets; |
1744 | 0 | } |
1745 | | |
1746 | | bool TabletManager::update_tablet_partition_id(::doris::TPartitionId partition_id, |
1747 | 0 | ::doris::TTabletId tablet_id) { |
1748 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
1749 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1750 | 0 | if (tablet == nullptr) { |
1751 | 0 | LOG(WARNING) << "get tablet err partition_id: " << partition_id |
1752 | 0 | << " tablet_id:" << tablet_id; |
1753 | 0 | return false; |
1754 | 0 | } |
1755 | 0 | _remove_tablet_from_partition(tablet); |
1756 | 0 | auto st = tablet->tablet_meta()->set_partition_id(partition_id); |
1757 | 0 | if (!st.ok()) { |
1758 | 0 | LOG(WARNING) << "set partition id err partition_id: " << partition_id |
1759 | 0 | << " tablet_id:" << tablet_id; |
1760 | 0 | return false; |
1761 | 0 | } |
1762 | 0 | _add_tablet_to_partition(tablet); |
1763 | 0 | return true; |
1764 | 0 | } |
1765 | | |
1766 | | void TabletManager::get_topn_tablet_delete_bitmap_score( |
1767 | 0 | uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) { |
1768 | 0 | int64_t max_delete_bitmap_score_tablet_id = 0; |
1769 | 0 | int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0; |
1770 | 0 | OlapStopWatch watch; |
1771 | 0 | uint64_t total_delete_map_count = 0; |
1772 | 0 | int n = config::check_tablet_delete_bitmap_score_top_n; |
1773 | 0 | std::vector<std::pair<std::shared_ptr<Tablet>, int64_t>> buf; |
1774 | 0 | buf.reserve(n + 1); |
1775 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1776 | 0 | uint64_t delete_bitmap_count = |
1777 | 0 | tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); |
1778 | 0 | total_delete_map_count += delete_bitmap_count; |
1779 | 0 | if (delete_bitmap_count > *max_delete_bitmap_score) { |
1780 | 0 | max_delete_bitmap_score_tablet_id = tablet->tablet_id(); |
1781 | 0 | *max_delete_bitmap_score = delete_bitmap_count; |
1782 | 0 | } |
1783 | 0 | buf.emplace_back(std::move(tablet), delete_bitmap_count); |
1784 | 0 | std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); |
1785 | 0 | if (buf.size() > n) { |
1786 | 0 | buf.pop_back(); |
1787 | 0 | } |
1788 | 0 | }; |
1789 | 0 | for_each_tablet(handler, filter_all_tablets); |
1790 | 0 | for (auto& [t, _] : buf) { |
1791 | 0 | t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score, |
1792 | 0 | &max_base_rowset_delete_bitmap_score_tablet_id); |
1793 | 0 | } |
1794 | 0 | std::stringstream ss; |
1795 | 0 | for (auto& i : buf) { |
1796 | 0 | ss << i.first->tablet_id() << ":" << i.second << ","; |
1797 | 0 | } |
1798 | 0 | LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n |
1799 | 0 | << ",tablet size=" << _tablets_shards.size() |
1800 | 0 | << ",total_delete_map_count=" << total_delete_map_count |
1801 | 0 | << ",cost(us)=" << watch.get_elapse_time_us() |
1802 | 0 | << ",max_delete_bitmap_score=" << *max_delete_bitmap_score |
1803 | 0 | << ",max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id |
1804 | 0 | << ",max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score |
1805 | 0 | << ",max_base_rowset_delete_bitmap_score_tablet_id=" |
1806 | 0 | << max_base_rowset_delete_bitmap_score_tablet_id << ",tablets=[" << ss.str() << "]"; |
1807 | 0 | } |
1808 | | |
1809 | | } // end namespace doris |