/root/doris/be/src/olap/tablet_manager.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/tablet_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/AgentService_types.h> |
22 | | #include <gen_cpp/BackendService_types.h> |
23 | | #include <gen_cpp/Descriptors_types.h> |
24 | | #include <gen_cpp/MasterService_types.h> |
25 | | #include <gen_cpp/Types_types.h> |
26 | | #include <gen_cpp/olap_file.pb.h> |
27 | | #include <re2/re2.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <list> |
32 | | #include <mutex> |
33 | | #include <ostream> |
34 | | #include <string_view> |
35 | | |
36 | | #include "absl/strings/substitute.h" |
37 | | #include "bvar/bvar.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "io/fs/local_file_system.h" |
42 | | #include "olap/cumulative_compaction_time_series_policy.h" |
43 | | #include "olap/data_dir.h" |
44 | | #include "olap/olap_common.h" |
45 | | #include "olap/olap_define.h" |
46 | | #include "olap/olap_meta.h" |
47 | | #include "olap/pb_helper.h" |
48 | | #include "olap/rowset/beta_rowset.h" |
49 | | #include "olap/rowset/rowset.h" |
50 | | #include "olap/rowset/rowset_meta_manager.h" |
51 | | #include "olap/storage_engine.h" |
52 | | #include "olap/tablet.h" |
53 | | #include "olap/tablet_meta.h" |
54 | | #include "olap/tablet_meta_manager.h" |
55 | | #include "olap/tablet_schema.h" |
56 | | #include "olap/txn_manager.h" |
57 | | #include "runtime/exec_env.h" |
58 | | #include "service/backend_options.h" |
59 | | #include "util/defer_op.h" |
60 | | #include "util/doris_metrics.h" |
61 | | #include "util/histogram.h" |
62 | | #include "util/metrics.h" |
63 | | #include "util/path_util.h" |
64 | | #include "util/scoped_cleanup.h" |
65 | | #include "util/stopwatch.hpp" |
66 | | #include "util/time.h" |
67 | | #include "util/trace.h" |
68 | | #include "util/uid_util.h" |
69 | | |
70 | | namespace doris { |
71 | | class CumulativeCompactionPolicy; |
72 | | } // namespace doris |
73 | | |
74 | | using std::map; |
75 | | using std::set; |
76 | | using std::string; |
77 | | using std::vector; |
78 | | |
79 | | namespace doris { |
80 | | using namespace ErrorCode; |
81 | | |
82 | | bvar::Adder<int64_t> g_tablet_meta_schema_columns_count("tablet_meta_schema_columns_count"); |
83 | | |
84 | | TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) |
85 | | : _engine(engine), |
86 | | _tablets_shards_size(tablet_map_lock_shard_size), |
87 | 209 | _tablets_shards_mask(tablet_map_lock_shard_size - 1) { |
88 | 209 | CHECK_GT(_tablets_shards_size, 0); |
89 | 209 | CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); |
90 | 209 | _tablets_shards.resize(_tablets_shards_size); |
91 | 209 | } |
92 | | |
93 | 209 | TabletManager::~TabletManager() = default; |
94 | | |
95 | | Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet, |
96 | 304 | bool update_meta, bool force, RuntimeProfile* profile) { |
97 | 304 | if (profile->get_counter("AddTablet") == nullptr) { |
98 | 4 | ADD_TIMER(profile, "AddTablet"); |
99 | 4 | } |
100 | 304 | Status res = Status::OK(); |
101 | 304 | VLOG_NOTICE << "begin to add tablet to TabletManager. " |
102 | 0 | << "tablet_id=" << tablet_id << ", force=" << force; |
103 | | |
104 | 304 | TabletSharedPtr existed_tablet = nullptr; |
105 | 304 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
106 | 304 | const auto& iter = tablet_map.find(tablet_id); |
107 | 304 | if (iter != tablet_map.end()) { |
108 | 4 | existed_tablet = iter->second; |
109 | 4 | } |
110 | | |
111 | 304 | if (existed_tablet == nullptr) { |
112 | 300 | return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/, |
113 | 300 | false /*drop_old*/, profile); |
114 | 300 | } |
115 | | // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets |
116 | | // and then reload the tablet, the tablet's path will the same |
117 | 4 | if (!force) { |
118 | 2 | if (existed_tablet->tablet_path() == tablet->tablet_path()) { |
119 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
120 | 0 | "add the same tablet twice! tablet_id={}, tablet_path={}", tablet_id, |
121 | 0 | tablet->tablet_path()); |
122 | 0 | } |
123 | 2 | if (existed_tablet->data_dir() == tablet->data_dir()) { |
124 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
125 | 0 | "add tablet with same data dir twice! tablet_id={}", tablet_id); |
126 | 0 | } |
127 | 2 | } |
128 | | |
129 | 4 | MonotonicStopWatch watch; |
130 | 4 | watch.start(); |
131 | | |
132 | | // During storage migration, the tablet is moved to another disk, have to check |
133 | | // if the new tablet's rowset version is larger than the old one to prevent losting data during |
134 | | // migration |
135 | 4 | int64_t old_time, new_time; |
136 | 4 | int32_t old_version, new_version; |
137 | 4 | { |
138 | 4 | std::shared_lock rdlock(existed_tablet->get_header_lock()); |
139 | 4 | const RowsetSharedPtr old_rowset = existed_tablet->get_rowset_with_max_version(); |
140 | 4 | const RowsetSharedPtr new_rowset = tablet->get_rowset_with_max_version(); |
141 | | // If new tablet is empty, it is a newly created schema change tablet. |
142 | | // the old tablet is dropped before add tablet. it should not exist old tablet |
143 | 4 | if (new_rowset == nullptr) { |
144 | | // it seems useless to call unlock and return here. |
145 | | // it could prevent error when log level is changed in the future. |
146 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
147 | 0 | "new tablet is empty and old tablet exists. it should not happen. tablet_id={}", |
148 | 0 | tablet_id); |
149 | 0 | } |
150 | 4 | old_time = old_rowset == nullptr ? -1 : old_rowset->creation_time(); |
151 | 4 | new_time = new_rowset->creation_time(); |
152 | 4 | old_version = old_rowset == nullptr ? -1 : old_rowset->end_version(); |
153 | 4 | new_version = new_rowset->end_version(); |
154 | 4 | } |
155 | 4 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetExistTabletVersion", "AddTablet"), |
156 | 4 | static_cast<int64_t>(watch.reset())); |
157 | | |
158 | | // In restore process, we replace all origin files in tablet dir with |
159 | | // the downloaded snapshot files. Then we try to reload tablet header. |
160 | | // force == true means we forcibly replace the Tablet in tablet_map |
161 | | // with the new one. But if we do so, the files in the tablet dir will be |
162 | | // dropped when the origin Tablet deconstruct. |
163 | | // So we set keep_files == true to not delete files when the |
164 | | // origin Tablet deconstruct. |
165 | | // During restore process, snapshot loader |
166 | | // replaced the old tablet's rowset with new rowsets, but the tablet path is reused, if drop files |
167 | | // here, the new rowset's file will also be dropped, so use keep files here |
168 | 4 | bool keep_files = force; |
169 | 4 | if (force || |
170 | 4 | (new_version > old_version || (new_version == old_version && new_time >= old_time))) { |
171 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
172 | 4 | res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files, |
173 | 4 | true /*drop_old*/, profile); |
174 | 4 | } else { |
175 | 0 | RETURN_IF_ERROR(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
176 | 0 | tablet->save_meta(); |
177 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
178 | 0 | static_cast<int64_t>(watch.reset())); |
179 | 0 | { |
180 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
181 | 0 | _shutdown_tablets.push_back(tablet); |
182 | 0 | } |
183 | |
|
184 | 0 | res = Status::Error<ENGINE_INSERT_OLD_TABLET>( |
185 | 0 | "set tablet to shutdown state. tablet_id={}, tablet_path={}", tablet->tablet_id(), |
186 | 0 | tablet->tablet_path()); |
187 | 0 | } |
188 | 4 | LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res |
189 | 4 | << ", tablet_id=" << tablet_id << ", old_version=" << old_version |
190 | 4 | << ", new_version=" << new_version << ", old_time=" << old_time |
191 | 4 | << ", new_time=" << new_time |
192 | 4 | << ", old_tablet_path=" << existed_tablet->tablet_path() |
193 | 4 | << ", new_tablet_path=" << tablet->tablet_path(); |
194 | | |
195 | 4 | return res; |
196 | 4 | } |
197 | | |
198 | | Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, |
199 | | const TabletSharedPtr& tablet, bool update_meta, |
200 | | bool keep_files, bool drop_old, |
201 | 304 | RuntimeProfile* profile) { |
202 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
203 | 304 | Status res = Status::OK(); |
204 | 304 | MonotonicStopWatch watch; |
205 | 304 | watch.start(); |
206 | 304 | if (update_meta) { |
207 | | // call tablet save meta in order to valid the meta |
208 | 304 | tablet->save_meta(); |
209 | 304 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
210 | 304 | static_cast<int64_t>(watch.reset())); |
211 | 304 | } |
212 | 304 | if (drop_old) { |
213 | | // If the new tablet is fresher than the existing one, then replace |
214 | | // the existing tablet with the new one. |
215 | | // Use default replica_id to ignore whether replica_id is match when drop tablet. |
216 | 4 | Status status = _drop_tablet(tablet_id, /* replica_id */ 0, keep_files, false, true); |
217 | 4 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropOldTablet", "AddTablet"), |
218 | 4 | static_cast<int64_t>(watch.reset())); |
219 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
220 | 4 | status, absl::Substitute("failed to drop old tablet when add new tablet. " |
221 | 4 | "tablet_id=$0", |
222 | 4 | tablet_id)); |
223 | 4 | } |
224 | | // Register tablet into DataDir, so that we can manage tablet from |
225 | | // the perspective of root path. |
226 | | // Example: unregister all tables when a bad disk found. |
227 | 304 | tablet->register_tablet_into_dir(); |
228 | 304 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
229 | 304 | tablet_map[tablet_id] = tablet; |
230 | 304 | _add_tablet_to_partition(tablet); |
231 | 304 | g_tablet_meta_schema_columns_count << tablet->tablet_meta()->tablet_columns_num(); |
232 | 304 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RegisterTabletInfo", "AddTablet"), |
233 | 304 | static_cast<int64_t>(watch.reset())); |
234 | | |
235 | 304 | VLOG_NOTICE << "add tablet to map successfully." |
236 | 0 | << " tablet_id=" << tablet_id; |
237 | | |
238 | 304 | return res; |
239 | 304 | } |
240 | | |
241 | 0 | bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) { |
242 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
243 | 0 | return _check_tablet_id_exist_unlocked(tablet_id); |
244 | 0 | } |
245 | | |
246 | 0 | bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { |
247 | 0 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
248 | 0 | return tablet_map.find(tablet_id) != tablet_map.end(); |
249 | 0 | } |
250 | | |
251 | | Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores, |
252 | 303 | RuntimeProfile* profile) { |
253 | 303 | DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
254 | | |
255 | 303 | int64_t tablet_id = request.tablet_id; |
256 | 303 | LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id |
257 | 303 | << ", table_id=" << request.table_id << ", partition_id=" << request.partition_id |
258 | 303 | << ", replica_id=" << request.replica_id << ", stores.size=" << stores.size() |
259 | 303 | << ", first store=" << stores[0]->path(); |
260 | | |
261 | | // when we create rollup tablet A(assume on shard-1) from tablet B(assume on shard-2) |
262 | | // we need use write lock on shard-1 and then use read lock on shard-2 |
263 | | // if there have create rollup tablet C(assume on shard-2) from tablet D(assume on shard-1) at the same time, we will meet deadlock |
264 | 303 | std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock); |
265 | 303 | bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
266 | 303 | bool is_schema_change_or_atomic_restore = |
267 | 303 | request.__isset.base_tablet_id && request.base_tablet_id > 0; |
268 | 303 | bool need_two_lock = |
269 | 303 | is_schema_change_or_atomic_restore && |
270 | 303 | ((_tablets_shards_mask & request.base_tablet_id) != (_tablets_shards_mask & tablet_id)); |
271 | 303 | if (need_two_lock) { |
272 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock")); |
273 | 0 | two_tablet_lock.lock(); |
274 | 0 | } |
275 | | |
276 | 303 | MonotonicStopWatch shard_lock_watch; |
277 | 303 | shard_lock_watch.start(); |
278 | 303 | std::lock_guard wrlock(_get_tablets_shard_lock(tablet_id)); |
279 | 303 | shard_lock_watch.stop(); |
280 | 303 | COUNTER_UPDATE(ADD_TIMER(profile, "GetShardLock"), |
281 | 303 | static_cast<int64_t>(shard_lock_watch.elapsed_time())); |
282 | | // Make create_tablet operation to be idempotent: |
283 | | // 1. Return true if tablet with same tablet_id and schema_hash exist; |
284 | | // false if tablet with same tablet_id but different schema_hash exist. |
285 | | // 2. When this is an alter task, if the tablet(both tablet_id and schema_hash are |
286 | | // same) already exist, then just return true(an duplicate request). But if |
287 | | // tablet_id exist but with different schema_hash, return an error(report task will |
288 | | // eventually trigger its deletion). |
289 | 303 | { |
290 | 303 | SCOPED_TIMER(ADD_TIMER(profile, "GetTabletUnlocked")); |
291 | 303 | if (_get_tablet_unlocked(tablet_id) != nullptr) { |
292 | 3 | LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; |
293 | 3 | return Status::OK(); |
294 | 3 | } |
295 | 303 | } |
296 | | |
297 | 300 | TabletSharedPtr base_tablet = nullptr; |
298 | | // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request |
299 | 300 | if (is_schema_change_or_atomic_restore) { |
300 | | // if base_tablet_id's lock diffrent with new_tablet_id, we need lock it. |
301 | 0 | if (need_two_lock) { |
302 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet")); |
303 | 0 | base_tablet = get_tablet(request.base_tablet_id); |
304 | 0 | two_tablet_lock.unlock(); |
305 | 0 | } else { |
306 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTabletUnlocked")); |
307 | 0 | base_tablet = _get_tablet_unlocked(request.base_tablet_id); |
308 | 0 | } |
309 | 0 | if (base_tablet == nullptr) { |
310 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
311 | 0 | return Status::Error<TABLE_CREATE_META_ERROR>( |
312 | 0 | "fail to create tablet(change schema/atomic restore), base tablet does not " |
313 | 0 | "exist. new_tablet_id={}, base_tablet_id={}", |
314 | 0 | tablet_id, request.base_tablet_id); |
315 | 0 | } |
316 | | // If we are doing schema-change or atomic-restore, we should use the same data dir |
317 | | // TODO(lingbin): A litter trick here, the directory should be determined before |
318 | | // entering this method |
319 | | // |
320 | | // ATTN: Since all restored replicas will be saved to HDD, so no storage_medium check here. |
321 | 0 | if (in_restore_mode || |
322 | 0 | request.storage_medium == base_tablet->data_dir()->storage_medium()) { |
323 | 0 | LOG(INFO) << "create tablet use the base tablet data dir. tablet_id=" << tablet_id |
324 | 0 | << ", base tablet_id=" << request.base_tablet_id |
325 | 0 | << ", data dir=" << base_tablet->data_dir()->path(); |
326 | 0 | stores.clear(); |
327 | 0 | stores.push_back(base_tablet->data_dir()); |
328 | 0 | } |
329 | 0 | } |
330 | | |
331 | | // set alter type to schema-change. it is useless |
332 | 300 | TabletSharedPtr tablet = _internal_create_tablet_unlocked( |
333 | 300 | request, is_schema_change_or_atomic_restore, base_tablet.get(), stores, profile); |
334 | 300 | if (tablet == nullptr) { |
335 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
336 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. tablet_id={}", |
337 | 0 | request.tablet_id); |
338 | 0 | } |
339 | | |
340 | 300 | LOG(INFO) << "success to create tablet. tablet_id=" << tablet_id |
341 | 300 | << ", tablet_path=" << tablet->tablet_path(); |
342 | 300 | return Status::OK(); |
343 | 300 | } |
344 | | |
345 | | TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( |
346 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
347 | 300 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
348 | | // If in schema-change state, base_tablet must also be provided. |
349 | | // i.e., is_schema_change and base_tablet are either assigned or not assigned |
350 | 300 | DCHECK((is_schema_change && base_tablet) || (!is_schema_change && !base_tablet)); |
351 | | |
352 | | // NOTE: The existence of tablet_id and schema_hash has already been checked, |
353 | | // no need check again here. |
354 | | |
355 | 300 | const std::string parent_timer_name = "InternalCreateTablet"; |
356 | 300 | SCOPED_TIMER(ADD_TIMER(profile, parent_timer_name)); |
357 | | |
358 | 300 | MonotonicStopWatch watch; |
359 | 300 | watch.start(); |
360 | 300 | auto create_meta_timer = ADD_CHILD_TIMER(profile, "CreateMeta", parent_timer_name); |
361 | 300 | auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change, base_tablet, |
362 | 300 | data_dirs, profile); |
363 | 300 | COUNTER_UPDATE(create_meta_timer, static_cast<int64_t>(watch.reset())); |
364 | 300 | if (tablet == nullptr) { |
365 | 0 | return nullptr; |
366 | 0 | } |
367 | | |
368 | 300 | int64_t new_tablet_id = request.tablet_id; |
369 | 300 | int32_t new_schema_hash = request.tablet_schema.schema_hash; |
370 | | |
371 | | // should remove the tablet's pending_id no matter create-tablet success or not |
372 | 300 | DataDir* data_dir = tablet->data_dir(); |
373 | | |
374 | | // TODO(yiguolei) |
375 | | // the following code is very difficult to understand because it mixed alter tablet v2 |
376 | | // and alter tablet v1 should remove alter tablet v1 code after v0.12 |
377 | 300 | Status res = Status::OK(); |
378 | 300 | bool is_tablet_added = false; |
379 | 300 | do { |
380 | 300 | res = tablet->init(); |
381 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "TabletInit", parent_timer_name), |
382 | 300 | static_cast<int64_t>(watch.reset())); |
383 | 300 | if (!res.ok()) { |
384 | 0 | LOG(WARNING) << "tablet init failed. tablet:" << tablet->tablet_id(); |
385 | 0 | break; |
386 | 0 | } |
387 | | |
388 | | // Create init version if this is not a restore mode replica and request.version is set |
389 | | // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
390 | | // if (!in_restore_mode && request.__isset.version) { |
391 | | // create initial rowset before add it to storage engine could omit many locks |
392 | 300 | res = tablet->create_initial_rowset(request.version); |
393 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "InitRowset", parent_timer_name), |
394 | 300 | static_cast<int64_t>(watch.reset())); |
395 | 300 | if (!res.ok()) { |
396 | 0 | LOG(WARNING) << "fail to create initial version for tablet. res=" << res; |
397 | 0 | break; |
398 | 0 | } |
399 | | |
400 | 300 | if (is_schema_change) { |
401 | | // if this is a new alter tablet, has to set its state to not ready |
402 | | // because schema change handler depends on it to check whether history data |
403 | | // convert finished |
404 | 0 | static_cast<void>(tablet->set_tablet_state(TabletState::TABLET_NOTREADY)); |
405 | 0 | } |
406 | | // Add tablet to StorageEngine will make it visible to user |
407 | | // Will persist tablet meta |
408 | 300 | auto add_tablet_timer = ADD_CHILD_TIMER(profile, "AddTablet", parent_timer_name); |
409 | 300 | res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false, profile); |
410 | 300 | COUNTER_UPDATE(add_tablet_timer, static_cast<int64_t>(watch.reset())); |
411 | 300 | if (!res.ok()) { |
412 | 0 | LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; |
413 | 0 | break; |
414 | 0 | } |
415 | 300 | is_tablet_added = true; |
416 | | |
417 | | // TODO(lingbin): The following logic seems useless, can be removed? |
418 | | // Because if _add_tablet_unlocked() return OK, we must can get it from map. |
419 | 300 | TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id); |
420 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetTablet", parent_timer_name), |
421 | 300 | static_cast<int64_t>(watch.reset())); |
422 | 300 | if (tablet_ptr == nullptr) { |
423 | 0 | res = Status::Error<TABLE_NOT_FOUND>("fail to get tablet. res={}", res); |
424 | 0 | break; |
425 | 0 | } |
426 | 300 | } while (false); |
427 | | |
428 | 300 | if (res.ok()) { |
429 | 300 | return tablet; |
430 | 300 | } |
431 | | // something is wrong, we need clear environment |
432 | 0 | if (is_tablet_added) { |
433 | 0 | Status status = _drop_tablet(new_tablet_id, request.replica_id, false, false, true); |
434 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropTablet", parent_timer_name), |
435 | 0 | static_cast<int64_t>(watch.reset())); |
436 | 0 | if (!status.ok()) { |
437 | 0 | LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; |
438 | 0 | } |
439 | 0 | } else { |
440 | 0 | tablet->delete_all_files(); |
441 | 0 | static_cast<void>(TabletMetaManager::remove(data_dir, new_tablet_id, new_schema_hash)); |
442 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemoveTabletFiles", parent_timer_name), |
443 | 0 | static_cast<int64_t>(watch.reset())); |
444 | 0 | } |
445 | 0 | return nullptr; |
446 | 300 | } |
447 | | |
448 | 300 | static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t tablet_id) { |
449 | 300 | string path = dir; |
450 | 300 | path = path_util::join_path_segments(path, DATA_PREFIX); |
451 | 300 | path = path_util::join_path_segments(path, std::to_string(shard_id)); |
452 | 300 | path = path_util::join_path_segments(path, std::to_string(tablet_id)); |
453 | 300 | return path; |
454 | 300 | } |
455 | | |
456 | | TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( |
457 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
458 | 300 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
459 | 300 | string pending_id = TABLET_ID_PREFIX + std::to_string(request.tablet_id); |
460 | | // Many attempts are made here in the hope that even if a disk fails, it can still continue. |
461 | 300 | std::string parent_timer_name = "CreateMeta"; |
462 | 300 | MonotonicStopWatch watch; |
463 | 300 | watch.start(); |
464 | 300 | for (auto& data_dir : data_dirs) { |
465 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemovePendingIds", parent_timer_name), |
466 | 300 | static_cast<int64_t>(watch.reset())); |
467 | | |
468 | 300 | TabletMetaSharedPtr tablet_meta; |
469 | | // if create meta failed, do not need to clean dir, because it is only in memory |
470 | 300 | Status res = _create_tablet_meta_unlocked(request, data_dir, is_schema_change, base_tablet, |
471 | 300 | &tablet_meta); |
472 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateMetaUnlock", parent_timer_name), |
473 | 300 | static_cast<int64_t>(watch.reset())); |
474 | 300 | if (!res.ok()) { |
475 | 0 | LOG(WARNING) << "fail to create tablet meta. res=" << res |
476 | 0 | << ", root=" << data_dir->path(); |
477 | 0 | continue; |
478 | 0 | } |
479 | | |
480 | 300 | string tablet_dir = |
481 | 300 | _gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(), request.tablet_id); |
482 | 300 | string schema_hash_dir = path_util::join_path_segments( |
483 | 300 | tablet_dir, std::to_string(request.tablet_schema.schema_hash)); |
484 | | |
485 | | // Because the tablet is removed asynchronously, so that the dir may still exist when BE |
486 | | // receive create-tablet request again, For example retried schema-change request |
487 | 300 | bool exists = true; |
488 | 300 | res = io::global_local_filesystem()->exists(schema_hash_dir, &exists); |
489 | 300 | if (!res.ok()) { |
490 | 0 | continue; |
491 | 0 | } |
492 | 300 | if (exists) { |
493 | 0 | LOG(WARNING) << "skip this dir because tablet path exist, path=" << schema_hash_dir; |
494 | 0 | continue; |
495 | 300 | } else { |
496 | 300 | Status st = io::global_local_filesystem()->create_directory(schema_hash_dir); |
497 | 300 | if (!st.ok()) { |
498 | 0 | continue; |
499 | 0 | } |
500 | 300 | } |
501 | | |
502 | 300 | if (tablet_meta->partition_id() <= 0) { |
503 | 233 | LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << ", tablet " |
504 | 233 | << tablet_meta->tablet_id(); |
505 | 233 | } |
506 | 300 | TabletSharedPtr new_tablet = |
507 | 300 | std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
508 | 300 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateTabletFromMeta", parent_timer_name), |
509 | 300 | static_cast<int64_t>(watch.reset())); |
510 | 300 | return new_tablet; |
511 | 300 | } |
512 | 0 | return nullptr; |
513 | 300 | } |
514 | | |
515 | | Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, |
516 | 252 | bool is_drop_table_or_partition) { |
517 | 252 | return _drop_tablet(tablet_id, replica_id, false, is_drop_table_or_partition, false); |
518 | 252 | } |
519 | | |
520 | | // Drop specified tablet. |
521 | | Status TabletManager::_drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files, |
522 | 256 | bool is_drop_table_or_partition, bool had_held_shard_lock) { |
523 | 256 | LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id |
524 | 256 | << ", is_drop_table_or_partition=" << is_drop_table_or_partition |
525 | 256 | << ", keep_files=" << keep_files; |
526 | 256 | DorisMetrics::instance()->drop_tablet_requests_total->increment(1); |
527 | | |
528 | 256 | RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet")); |
529 | 256 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }}; |
530 | | |
531 | | // Fetch tablet which need to be dropped |
532 | 256 | TabletSharedPtr to_drop_tablet; |
533 | 256 | { |
534 | 256 | std::unique_lock<std::shared_mutex> wlock(_get_tablets_shard_lock(tablet_id), |
535 | 256 | std::defer_lock); |
536 | 256 | if (!had_held_shard_lock) { |
537 | 252 | wlock.lock(); |
538 | 252 | } |
539 | 256 | to_drop_tablet = _get_tablet_unlocked(tablet_id); |
540 | 256 | if (to_drop_tablet == nullptr) { |
541 | 1 | LOG(WARNING) << "fail to drop tablet because it does not exist. " |
542 | 1 | << "tablet_id=" << tablet_id; |
543 | 1 | return Status::OK(); |
544 | 1 | } |
545 | | |
546 | | // We should compare replica id to avoid dropping new cloned tablet. |
547 | | // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. |
548 | 255 | if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { |
549 | 0 | return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), |
550 | 0 | replica_id); |
551 | 0 | } |
552 | | |
553 | 255 | _remove_tablet_from_partition(to_drop_tablet); |
554 | 255 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
555 | 255 | tablet_map.erase(tablet_id); |
556 | 255 | } |
557 | | |
558 | 0 | to_drop_tablet->clear_cache(); |
559 | | |
560 | 255 | { |
561 | | // drop tablet will update tablet meta, should lock |
562 | 255 | std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock()); |
563 | 255 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
564 | | // NOTE: has to update tablet here, but must not update tablet meta directly. |
565 | | // because other thread may hold the tablet object, they may save meta too. |
566 | | // If update meta directly here, other thread may override the meta |
567 | | // and the tablet will be loaded at restart time. |
568 | | // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. |
569 | | // |
570 | | // Until now, only the restore task uses keep files. |
571 | 255 | RETURN_IF_ERROR(to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN)); |
572 | 255 | if (!keep_files) { |
573 | 253 | LOG(INFO) << "set tablet to shutdown state and remove it from memory. " |
574 | 253 | << "tablet_id=" << tablet_id |
575 | 253 | << ", tablet_path=" << to_drop_tablet->tablet_path(); |
576 | | // We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta, |
577 | | // otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost. |
578 | 253 | if (is_drop_table_or_partition) { |
579 | 0 | RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets()); |
580 | 0 | } |
581 | 253 | to_drop_tablet->save_meta(); |
582 | 253 | { |
583 | 253 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
584 | 253 | _shutdown_tablets.push_back(to_drop_tablet); |
585 | 253 | } |
586 | 253 | } |
587 | 255 | } |
588 | | |
589 | 255 | to_drop_tablet->deregister_tablet_from_dir(); |
590 | 255 | g_tablet_meta_schema_columns_count << -to_drop_tablet->tablet_meta()->tablet_columns_num(); |
591 | 255 | return Status::OK(); |
592 | 255 | } |
593 | | |
594 | 2.69k | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) { |
595 | 2.69k | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
596 | 2.69k | return _get_tablet_unlocked(tablet_id, include_deleted, err); |
597 | 2.69k | } |
598 | | |
599 | 0 | std::vector<TabletSharedPtr> TabletManager::get_all_tablet(std::function<bool(Tablet*)>&& filter) { |
600 | 0 | std::vector<TabletSharedPtr> res; |
601 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { res.emplace_back(tablet); }, |
602 | 0 | std::move(filter)); |
603 | 0 | return res; |
604 | 0 | } |
605 | | |
606 | | void TabletManager::for_each_tablet(std::function<void(const TabletSharedPtr&)>&& handler, |
607 | 11 | std::function<bool(Tablet*)>&& filter) { |
608 | 11 | std::vector<TabletSharedPtr> tablets; |
609 | 11 | for (const auto& tablets_shard : _tablets_shards) { |
610 | 11 | tablets.clear(); |
611 | 11 | { |
612 | 11 | std::shared_lock rdlock(tablets_shard.lock); |
613 | 257 | for (const auto& [id, tablet] : tablets_shard.tablet_map) { |
614 | 257 | if (filter(tablet.get())) { |
615 | 257 | tablets.emplace_back(tablet); |
616 | 257 | } |
617 | 257 | } |
618 | 11 | } |
619 | 257 | for (const auto& tablet : tablets) { |
620 | 257 | handler(tablet); |
621 | 257 | } |
622 | 11 | } |
623 | 11 | } |
624 | | |
625 | | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, |
626 | 2.69k | string* err) { |
627 | 2.69k | TabletSharedPtr tablet; |
628 | 2.69k | tablet = _get_tablet_unlocked(tablet_id); |
629 | 2.69k | if (tablet == nullptr && include_deleted) { |
630 | 4 | std::shared_lock rdlock(_shutdown_tablets_lock); |
631 | 4 | for (auto& deleted_tablet : _shutdown_tablets) { |
632 | 2 | CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr"; |
633 | 2 | if (deleted_tablet->tablet_id() == tablet_id) { |
634 | 2 | tablet = deleted_tablet; |
635 | 2 | break; |
636 | 2 | } |
637 | 2 | } |
638 | 4 | } |
639 | | |
640 | 2.69k | if (tablet == nullptr) { |
641 | 239 | if (err != nullptr) { |
642 | 1 | *err = "tablet does not exist. " + BackendOptions::get_localhost(); |
643 | 1 | } |
644 | 239 | return nullptr; |
645 | 239 | } |
646 | | #ifndef BE_TEST |
647 | | if (!tablet->is_used()) { |
648 | | LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; |
649 | | if (err != nullptr) { |
650 | | *err = "tablet cannot be used. " + BackendOptions::get_localhost(); |
651 | | } |
652 | | return nullptr; |
653 | | } |
654 | | #endif |
655 | | |
656 | 2.45k | return tablet; |
657 | 2.69k | } |
658 | | |
659 | | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, TabletUid tablet_uid, |
660 | 0 | bool include_deleted, string* err) { |
661 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
662 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, include_deleted, err); |
663 | 0 | if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) { |
664 | 0 | return tablet; |
665 | 0 | } |
666 | 0 | return nullptr; |
667 | 0 | } |
668 | | |
669 | 0 | uint64_t TabletManager::get_rowset_nums() { |
670 | 0 | uint64_t rowset_nums = 0; |
671 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { rowset_nums += tablet->version_count(); }, |
672 | 0 | filter_all_tablets); |
673 | 0 | return rowset_nums; |
674 | 0 | } |
675 | | |
676 | 0 | uint64_t TabletManager::get_segment_nums() { |
677 | 0 | uint64_t segment_nums = 0; |
678 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { segment_nums += tablet->segment_count(); }, |
679 | 0 | filter_all_tablets); |
680 | 0 | return segment_nums; |
681 | 0 | } |
682 | | |
683 | | bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, |
684 | | TTabletId* tablet_id, |
685 | 37 | TSchemaHash* schema_hash) { |
686 | | // the path like: /data/14/10080/964828783/ |
687 | 37 | static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); |
688 | | // match tablet schema hash data path, for example, the path is /data/1/16791/29998 |
689 | | // 1 is shard id , 16791 is tablet id, 29998 is schema hash |
690 | 37 | if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) { |
691 | 33 | return true; |
692 | 33 | } |
693 | | |
694 | | // If we can't match normal path pattern, this may be a path which is a empty tablet |
695 | | // directory. Use this pattern to match empty tablet directory. In this case schema_hash |
696 | | // will be set to zero. |
697 | 4 | static re2::RE2 empty_tablet_re("/data/\\d+/(\\d+)($|/$)"); |
698 | 4 | if (!RE2::PartialMatch(path, empty_tablet_re, tablet_id)) { |
699 | 2 | return false; |
700 | 2 | } |
701 | 2 | *schema_hash = 0; |
702 | 2 | return true; |
703 | 4 | } |
704 | | |
705 | 3 | bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { |
706 | | // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat |
707 | 3 | static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); |
708 | 3 | string id_str; |
709 | 3 | bool ret = RE2::PartialMatch(path, re, &id_str); |
710 | 3 | if (ret) { |
711 | 1 | rowset_id->init(id_str); |
712 | 1 | return true; |
713 | 1 | } |
714 | 2 | return false; |
715 | 3 | } |
716 | | |
717 | 0 | void TabletManager::get_tablet_stat(TTabletStatResult* result) { |
718 | 0 | std::shared_ptr<std::vector<TTabletStat>> local_cache; |
719 | 0 | { |
720 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
721 | 0 | local_cache = _tablet_stat_list_cache; |
722 | 0 | } |
723 | 0 | result->__set_tablet_stat_list(*local_cache); |
724 | 0 | } |
725 | | |
726 | | struct TabletScore { |
727 | | TabletSharedPtr tablet_ptr; |
728 | | int score; |
729 | | }; |
730 | | |
731 | | std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( |
732 | | CompactionType compaction_type, DataDir* data_dir, |
733 | | const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, |
734 | | const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& |
735 | 6 | all_cumulative_compaction_policies) { |
736 | 6 | int64_t now_ms = UnixMillis(); |
737 | 6 | const string& compaction_type_str = |
738 | 6 | compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; |
739 | 6 | uint32_t highest_score = 0; |
740 | | // find the single compaction tablet |
741 | 6 | uint32_t single_compact_highest_score = 0; |
742 | 6 | TabletSharedPtr best_tablet; |
743 | 6 | TabletSharedPtr best_single_compact_tablet; |
744 | 92 | auto cmp = [](TabletScore left, TabletScore right) { return left.score > right.score; }; |
745 | 6 | std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> top_tablets(cmp); |
746 | | |
747 | 257 | auto handler = [&](const TabletSharedPtr& tablet_ptr) { |
748 | 257 | if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
749 | 0 | LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id() |
750 | 0 | << " will be ignored by automatic compaction tasks since it's " |
751 | 0 | << "set to disabled automatic compaction."; |
752 | 0 | return; |
753 | 0 | } |
754 | | |
755 | 257 | if (config::enable_skip_tablet_compaction && |
756 | 257 | tablet_ptr->should_skip_compaction(compaction_type, UnixSeconds())) { |
757 | 0 | return; |
758 | 0 | } |
759 | 257 | if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
760 | 0 | return; |
761 | 0 | } |
762 | | |
763 | 257 | auto search = tablet_submitted_compaction.find(tablet_ptr); |
764 | 257 | if (search != tablet_submitted_compaction.end()) { |
765 | 0 | return; |
766 | 0 | } |
767 | | |
768 | 257 | int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); |
769 | 257 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
770 | 0 | last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); |
771 | 0 | } |
772 | 257 | if (now_ms - last_failure_ms <= config::tablet_sched_delay_time_ms) { |
773 | 0 | VLOG_DEBUG << "Too often to check compaction, skip it. " |
774 | 0 | << "compaction_type=" << compaction_type_str |
775 | 0 | << ", last_failure_time_ms=" << last_failure_ms |
776 | 0 | << ", tablet_id=" << tablet_ptr->tablet_id(); |
777 | 0 | return; |
778 | 0 | } |
779 | | |
780 | 257 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
781 | 0 | std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(), |
782 | 0 | std::try_to_lock); |
783 | 0 | if (!lock.owns_lock()) { |
784 | 0 | LOG(INFO) << "can not get base lock: " << tablet_ptr->tablet_id(); |
785 | 0 | return; |
786 | 0 | } |
787 | 257 | } else { |
788 | 257 | std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(), |
789 | 257 | std::try_to_lock); |
790 | 257 | if (!lock.owns_lock()) { |
791 | 0 | LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id(); |
792 | 0 | return; |
793 | 0 | } |
794 | 257 | } |
795 | 257 | auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( |
796 | 257 | tablet_ptr->tablet_meta()->compaction_policy()); |
797 | 257 | uint32_t current_compaction_score = tablet_ptr->calc_compaction_score(); |
798 | 257 | if (current_compaction_score < 5) { |
799 | 8 | tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); |
800 | 8 | } |
801 | | |
802 | | // tablet should do single compaction |
803 | 257 | if (current_compaction_score > single_compact_highest_score && |
804 | 257 | tablet_ptr->should_fetch_from_peer()) { |
805 | 3 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
806 | 3 | cumulative_compaction_policy); |
807 | 3 | if (ret) { |
808 | 3 | single_compact_highest_score = current_compaction_score; |
809 | 3 | best_single_compact_tablet = tablet_ptr; |
810 | 3 | } |
811 | 3 | } |
812 | | |
813 | 257 | if (config::compaction_num_per_round > 1 && !tablet_ptr->should_fetch_from_peer()) { |
814 | 205 | TabletScore ts; |
815 | 205 | ts.score = current_compaction_score; |
816 | 205 | ts.tablet_ptr = tablet_ptr; |
817 | 205 | if ((top_tablets.size() >= config::compaction_num_per_round && |
818 | 205 | current_compaction_score > top_tablets.top().score) || |
819 | 205 | top_tablets.size() < config::compaction_num_per_round) { |
820 | 25 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
821 | 25 | cumulative_compaction_policy); |
822 | 25 | if (ret) { |
823 | 25 | top_tablets.push(ts); |
824 | 25 | if (top_tablets.size() > config::compaction_num_per_round) { |
825 | 0 | top_tablets.pop(); |
826 | 0 | } |
827 | 25 | if (current_compaction_score > highest_score) { |
828 | 3 | highest_score = current_compaction_score; |
829 | 3 | } |
830 | 25 | } |
831 | 25 | } |
832 | 205 | } else { |
833 | 52 | if (current_compaction_score > highest_score && !tablet_ptr->should_fetch_from_peer()) { |
834 | 3 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
835 | 3 | cumulative_compaction_policy); |
836 | 3 | if (ret) { |
837 | 3 | highest_score = current_compaction_score; |
838 | 3 | best_tablet = tablet_ptr; |
839 | 3 | } |
840 | 3 | } |
841 | 52 | } |
842 | 257 | }; |
843 | | |
844 | 6 | for_each_tablet(handler, filter_all_tablets); |
845 | 6 | std::vector<TabletSharedPtr> picked_tablet; |
846 | 6 | if (best_tablet != nullptr) { |
847 | 3 | VLOG_CRITICAL << "Found the best tablet for compaction. " |
848 | 0 | << "compaction_type=" << compaction_type_str |
849 | 0 | << ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path() |
850 | 0 | << ", highest_score=" << highest_score |
851 | 0 | << ", fetch from peer: " << best_tablet->should_fetch_from_peer(); |
852 | 3 | picked_tablet.emplace_back(std::move(best_tablet)); |
853 | 3 | } |
854 | | |
855 | 6 | std::vector<TabletSharedPtr> reverse_top_tablets; |
856 | 31 | while (!top_tablets.empty()) { |
857 | 25 | reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr); |
858 | 25 | top_tablets.pop(); |
859 | 25 | } |
860 | | |
861 | 31 | for (auto it = reverse_top_tablets.rbegin(); it != reverse_top_tablets.rend(); ++it) { |
862 | 25 | picked_tablet.emplace_back(*it); |
863 | 25 | } |
864 | | |
865 | | // pick single compaction tablet needs the highest score |
866 | 6 | if (best_single_compact_tablet != nullptr && single_compact_highest_score >= highest_score) { |
867 | 2 | VLOG_CRITICAL << "Found the best tablet for single compaction. " |
868 | 0 | << "compaction_type=" << compaction_type_str |
869 | 0 | << ", tablet_id=" << best_single_compact_tablet->tablet_id() |
870 | 0 | << ", path=" << data_dir->path() |
871 | 0 | << ", highest_score=" << single_compact_highest_score << ", fetch from peer: " |
872 | 0 | << best_single_compact_tablet->should_fetch_from_peer(); |
873 | 2 | picked_tablet.emplace_back(std::move(best_single_compact_tablet)); |
874 | 2 | } |
875 | 6 | *score = highest_score > single_compact_highest_score ? highest_score |
876 | 6 | : single_compact_highest_score; |
877 | 6 | return picked_tablet; |
878 | 6 | } |
879 | | |
880 | | Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, |
881 | | TSchemaHash schema_hash, std::string_view meta_binary, |
882 | | bool update_meta, bool force, bool restore, |
883 | 4 | bool check_path) { |
884 | 4 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
885 | 4 | Status status = tablet_meta->deserialize(meta_binary); |
886 | 4 | if (!status.ok()) { |
887 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
888 | 0 | "fail to load tablet because can not parse meta_binary string. tablet_id={}, " |
889 | 0 | "schema_hash={}, path={}, status={}", |
890 | 0 | tablet_id, schema_hash, data_dir->path(), status); |
891 | 0 | } |
892 | | |
893 | | // check if tablet meta is valid |
894 | 4 | if (tablet_meta->tablet_id() != tablet_id || tablet_meta->schema_hash() != schema_hash) { |
895 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
896 | 0 | "fail to load tablet because meet invalid tablet meta. trying to load " |
897 | 0 | "tablet(tablet_id={}, schema_hash={}), but meet tablet={}, path={}", |
898 | 0 | tablet_id, schema_hash, tablet_meta->tablet_id(), data_dir->path()); |
899 | 0 | } |
900 | 4 | if (tablet_meta->tablet_uid().hi == 0 && tablet_meta->tablet_uid().lo == 0) { |
901 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
902 | 0 | "fail to load tablet because its uid == 0. tablet={}, path={}", |
903 | 0 | tablet_meta->tablet_id(), data_dir->path()); |
904 | 0 | } |
905 | | |
906 | 4 | if (restore) { |
907 | | // we're restoring tablet from trash, tablet state should be changed from shutdown back to running |
908 | 0 | tablet_meta->set_tablet_state(TABLET_RUNNING); |
909 | 0 | } |
910 | | |
911 | 4 | if (tablet_meta->partition_id() == 0) { |
912 | 1 | LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0"; |
913 | 1 | } |
914 | | |
915 | 4 | TabletSharedPtr tablet = std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
916 | | |
917 | | // NOTE: method load_tablet_from_meta could be called by two cases as below |
918 | | // case 1: BE start; |
919 | | // case 2: Clone Task/Restore |
920 | | // For case 1 doesn't need path check because BE is just starting and not ready, |
921 | | // just check tablet meta status to judge whether tablet is delete is enough. |
922 | | // For case 2, If a tablet has just been copied to local BE, |
923 | | // it may be cleared by gc-thread(see perform_tablet_gc) because the tablet meta may not be loaded to memory. |
924 | | // So clone task should check path and then failed and retry in this case. |
925 | 4 | if (check_path) { |
926 | 4 | bool exists = true; |
927 | 4 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(tablet->tablet_path(), &exists)); |
928 | 4 | if (!exists) { |
929 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
930 | 0 | "tablet path not exists, create tablet failed, path={}", tablet->tablet_path()); |
931 | 0 | } |
932 | 4 | } |
933 | | |
934 | 4 | if (tablet->tablet_meta()->tablet_state() == TABLET_SHUTDOWN) { |
935 | 0 | { |
936 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
937 | 0 | _shutdown_tablets.push_back(tablet); |
938 | 0 | } |
939 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
940 | 0 | "fail to load tablet because it is to be deleted. tablet_id={}, schema_hash={}, " |
941 | 0 | "path={}", |
942 | 0 | tablet_id, schema_hash, data_dir->path()); |
943 | 0 | } |
944 | | // NOTE: We do not check tablet's initial version here, because if BE restarts when |
945 | | // one tablet is doing schema-change, we may meet empty tablet. |
946 | 4 | if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { |
947 | | // tablet state is invalid, drop tablet |
948 | 0 | return Status::Error<TABLE_INDEX_VALIDATE_ERROR>( |
949 | 0 | "fail to load tablet. it is in running state but without delta. tablet={}, path={}", |
950 | 0 | tablet->tablet_id(), data_dir->path()); |
951 | 0 | } |
952 | | |
953 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
954 | 4 | tablet->init(), absl::Substitute("tablet init failed. tablet=$0", tablet->tablet_id())); |
955 | | |
956 | 4 | RuntimeProfile profile("CreateTablet"); |
957 | 4 | std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id)); |
958 | 4 | RETURN_NOT_OK_STATUS_WITH_WARN( |
959 | 4 | _add_tablet_unlocked(tablet_id, tablet, update_meta, force, &profile), |
960 | 4 | absl::Substitute("fail to add tablet. tablet=$0", tablet->tablet_id())); |
961 | | |
962 | 4 | return Status::OK(); |
963 | 4 | } |
964 | | |
965 | | Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, |
966 | | SchemaHash schema_hash, const string& schema_hash_path, |
967 | 3 | bool force, bool restore) { |
968 | 3 | LOG(INFO) << "begin to load tablet from dir. " |
969 | 3 | << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash |
970 | 3 | << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; |
971 | | // not add lock here, because load_tablet_from_meta already add lock |
972 | 3 | std::string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id); |
973 | | // should change shard id before load tablet |
974 | 3 | std::string shard_path = |
975 | 3 | path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path))); |
976 | 3 | std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1); |
977 | 3 | int32_t shard = stol(shard_str); |
978 | | |
979 | 3 | bool exists = false; |
980 | 3 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(header_path, &exists)); |
981 | 3 | if (!exists) { |
982 | 0 | return Status::Error<NOT_FOUND>("fail to find header file. [header_path={}]", header_path); |
983 | 0 | } |
984 | | |
985 | 3 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
986 | 3 | if (!tablet_meta->create_from_file(header_path).ok()) { |
987 | 0 | return Status::Error<ENGINE_LOAD_INDEX_TABLE_ERROR>( |
988 | 0 | "fail to load tablet_meta. file_path={}", header_path); |
989 | 0 | } |
990 | 3 | TabletUid tablet_uid = TabletUid::gen_uid(); |
991 | | |
992 | | // remove rowset binlog metas |
993 | 3 | auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", schema_hash_path); |
994 | 3 | bool binlog_metas_file_exists = false; |
995 | 3 | auto file_exists_status = |
996 | 3 | io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); |
997 | 3 | if (!file_exists_status.ok()) { |
998 | 0 | return file_exists_status; |
999 | 0 | } |
1000 | 3 | bool contain_binlog = false; |
1001 | 3 | RowsetBinlogMetasPB rowset_binlog_metas_pb; |
1002 | 3 | if (binlog_metas_file_exists) { |
1003 | 0 | auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); |
1004 | 0 | if (binlog_meta_filesize > 0) { |
1005 | 0 | contain_binlog = true; |
1006 | 0 | RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); |
1007 | 0 | VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; |
1008 | 0 | } |
1009 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); |
1010 | 0 | } |
1011 | 3 | if (contain_binlog) { |
1012 | 0 | auto binlog_dir = fmt::format("{}/_binlog", schema_hash_path); |
1013 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); |
1014 | | |
1015 | 0 | std::vector<io::FileInfo> files; |
1016 | 0 | RETURN_IF_ERROR( |
1017 | 0 | io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); |
1018 | 0 | for (auto& file : files) { |
1019 | 0 | auto& filename = file.file_name; |
1020 | 0 | std::string new_suffix; |
1021 | 0 | std::string old_suffix; |
1022 | |
|
1023 | 0 | if (filename.ends_with(".binlog")) { |
1024 | 0 | old_suffix = ".binlog"; |
1025 | 0 | new_suffix = ".dat"; |
1026 | 0 | } else if (filename.ends_with(".binlog-index")) { |
1027 | 0 | old_suffix = ".binlog-index"; |
1028 | 0 | new_suffix = ".idx"; |
1029 | 0 | } else { |
1030 | 0 | continue; |
1031 | 0 | } |
1032 | | |
1033 | 0 | std::string new_filename = filename; |
1034 | 0 | new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(), |
1035 | 0 | new_suffix); |
1036 | 0 | auto from = fmt::format("{}/{}", schema_hash_path, filename); |
1037 | 0 | auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); |
1038 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); |
1039 | 0 | } |
1040 | | |
1041 | 0 | auto* meta = store->get_meta(); |
1042 | | // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas |
1043 | 0 | RETURN_IF_ERROR( |
1044 | 0 | RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); |
1045 | 0 | } |
1046 | | |
1047 | | // has to change shard id here, because meta file maybe copied from other source |
1048 | | // its shard is different from local shard |
1049 | 3 | tablet_meta->set_shard_id(shard); |
1050 | | // load dir is called by clone, restore, storage migration |
1051 | | // should change tablet uid when tablet object changed |
1052 | 3 | tablet_meta->set_tablet_uid(std::move(tablet_uid)); |
1053 | 3 | std::string meta_binary; |
1054 | 3 | tablet_meta->serialize(&meta_binary); |
1055 | 3 | RETURN_NOT_OK_STATUS_WITH_WARN( |
1056 | 3 | load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force, restore, |
1057 | 3 | true), |
1058 | 3 | absl::Substitute("fail to load tablet. header_path=$0", header_path)); |
1059 | | |
1060 | 3 | return Status::OK(); |
1061 | 3 | } |
1062 | | |
1063 | 0 | Status TabletManager::report_tablet_info(TTabletInfo* tablet_info) { |
1064 | 0 | LOG(INFO) << "begin to process report tablet info." |
1065 | 0 | << "tablet_id=" << tablet_info->tablet_id; |
1066 | |
|
1067 | 0 | Status res = Status::OK(); |
1068 | |
|
1069 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id); |
1070 | 0 | if (tablet == nullptr) { |
1071 | 0 | return Status::Error<TABLE_NOT_FOUND>("can't find tablet={}", tablet_info->tablet_id); |
1072 | 0 | } |
1073 | | |
1074 | 0 | tablet->build_tablet_report_info(tablet_info); |
1075 | 0 | VLOG_TRACE << "success to process report tablet info."; |
1076 | 0 | return res; |
1077 | 0 | } |
1078 | | |
1079 | 0 | void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { |
1080 | 0 | DCHECK(tablets_info != nullptr); |
1081 | 0 | VLOG_NOTICE << "begin to build all report tablets info"; |
1082 | | |
1083 | | // build the expired txn map first, outside the tablet map lock |
1084 | 0 | std::map<TabletInfo, std::vector<int64_t>> expire_txn_map; |
1085 | 0 | _engine.txn_manager()->build_expire_txn_map(&expire_txn_map); |
1086 | 0 | LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; |
1087 | |
|
1088 | 0 | HistogramStat tablet_version_num_hist; |
1089 | 0 | auto local_cache = std::make_shared<std::vector<TTabletStat>>(); |
1090 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1091 | 0 | auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; |
1092 | 0 | TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back(); |
1093 | 0 | tablet->build_tablet_report_info(&tablet_info, true, true); |
1094 | | // find expired transaction corresponding to this tablet |
1095 | 0 | TabletInfo tinfo(tablet->tablet_id(), tablet->tablet_uid()); |
1096 | 0 | auto find = expire_txn_map.find(tinfo); |
1097 | 0 | if (find != expire_txn_map.end()) { |
1098 | 0 | tablet_info.__set_transaction_ids(find->second); |
1099 | 0 | expire_txn_map.erase(find); |
1100 | 0 | } |
1101 | 0 | tablet_version_num_hist.add(tablet_info.total_version_count); |
1102 | 0 | auto& t_tablet_stat = local_cache->emplace_back(); |
1103 | 0 | t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); |
1104 | 0 | t_tablet_stat.__set_data_size(tablet_info.data_size); |
1105 | 0 | t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); |
1106 | 0 | t_tablet_stat.__set_row_count(tablet_info.row_count); |
1107 | 0 | t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); |
1108 | 0 | t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); |
1109 | 0 | t_tablet_stat.__set_visible_version(tablet_info.version); |
1110 | 0 | t_tablet_stat.__set_local_index_size(tablet_info.local_index_size); |
1111 | 0 | t_tablet_stat.__set_local_segment_size(tablet_info.local_segment_size); |
1112 | 0 | t_tablet_stat.__set_remote_index_size(tablet_info.remote_index_size); |
1113 | 0 | t_tablet_stat.__set_remote_segment_size(tablet_info.remote_segment_size); |
1114 | 0 | }; |
1115 | 0 | for_each_tablet(handler, filter_all_tablets); |
1116 | |
|
1117 | 0 | { |
1118 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
1119 | 0 | _tablet_stat_list_cache.swap(local_cache); |
1120 | 0 | } |
1121 | 0 | DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( |
1122 | 0 | tablet_version_num_hist); |
1123 | 0 | LOG(INFO) << "success to build all report tablets info. tablet_count=" << tablets_info->size(); |
1124 | 0 | } |
1125 | | |
1126 | 5 | Status TabletManager::start_trash_sweep() { |
1127 | 5 | DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK); |
1128 | 5 | std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock); |
1129 | 5 | if (!lock.try_lock()) { |
1130 | 0 | return Status::OK(); |
1131 | 0 | } |
1132 | | |
1133 | 5 | for_each_tablet([](const TabletSharedPtr& tablet) { tablet->delete_expired_stale_rowset(); }, |
1134 | 5 | filter_all_tablets); |
1135 | | |
1136 | 5 | std::list<TabletSharedPtr>::iterator last_it; |
1137 | 5 | { |
1138 | 5 | std::shared_lock rdlock(_shutdown_tablets_lock); |
1139 | 5 | last_it = _shutdown_tablets.begin(); |
1140 | 5 | if (last_it == _shutdown_tablets.end()) { |
1141 | 0 | return Status::OK(); |
1142 | 0 | } |
1143 | 5 | } |
1144 | | |
1145 | 10 | auto get_batch_tablets = [this, &last_it](int limit) { |
1146 | 10 | std::vector<TabletSharedPtr> batch_tablets; |
1147 | 10 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
1148 | 241 | while (last_it != _shutdown_tablets.end() && batch_tablets.size() < limit) { |
1149 | | // it means current tablet is referenced by other thread |
1150 | 231 | if (last_it->use_count() > 1) { |
1151 | 6 | last_it++; |
1152 | 225 | } else { |
1153 | 225 | batch_tablets.push_back(*last_it); |
1154 | 225 | last_it = _shutdown_tablets.erase(last_it); |
1155 | 225 | } |
1156 | 231 | } |
1157 | | |
1158 | 10 | return batch_tablets; |
1159 | 10 | }; |
1160 | | |
1161 | 5 | std::list<TabletSharedPtr> failed_tablets; |
1162 | | // return true if need continue delete |
1163 | 6 | auto delete_one_batch = [this, get_batch_tablets, &failed_tablets]() -> bool { |
1164 | 6 | int limit = 200; |
1165 | 10 | for (;;) { |
1166 | 10 | auto batch_tablets = get_batch_tablets(limit); |
1167 | 225 | for (const auto& tablet : batch_tablets) { |
1168 | 225 | if (_move_tablet_to_trash(tablet)) { |
1169 | 225 | limit--; |
1170 | 225 | } else { |
1171 | 0 | failed_tablets.push_back(tablet); |
1172 | 0 | } |
1173 | 225 | } |
1174 | 10 | if (limit <= 0) { |
1175 | 1 | return true; |
1176 | 1 | } |
1177 | 9 | if (batch_tablets.empty()) { |
1178 | 5 | return false; |
1179 | 5 | } |
1180 | 9 | } |
1181 | | |
1182 | 0 | return false; |
1183 | 6 | }; |
1184 | | |
1185 | 6 | while (delete_one_batch()) { |
1186 | | #ifndef BE_TEST |
1187 | | sleep(1); |
1188 | | #endif |
1189 | 1 | } |
1190 | | |
1191 | 5 | if (!failed_tablets.empty()) { |
1192 | 0 | std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock); |
1193 | 0 | _shutdown_tablets.splice(_shutdown_tablets.end(), failed_tablets); |
1194 | 0 | } |
1195 | | |
1196 | 5 | return Status::OK(); |
1197 | 5 | } |
1198 | | |
1199 | 225 | bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) { |
1200 | 225 | RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash")); |
1201 | 225 | Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }}; |
1202 | | |
1203 | 225 | TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id()); |
1204 | 225 | if (tablet_in_not_shutdown) { |
1205 | 0 | TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash(); |
1206 | 0 | size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash(); |
1207 | 0 | if (tablet->schema_hash() == schema_hash_not_shutdown && |
1208 | 0 | tablet->data_dir()->path_hash() == path_hash_not_shutdown) { |
1209 | 0 | tablet->clear_cache(); |
1210 | | // shard_id in memory not eq shard_id in shutdown |
1211 | 0 | if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) { |
1212 | 0 | LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id=" |
1213 | 0 | << tablet_in_not_shutdown->tablet_id() |
1214 | 0 | << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1215 | 0 | << ", shutdown tablet path=" << tablet->tablet_path(); |
1216 | 0 | return tablet->data_dir()->move_to_trash(tablet->tablet_path()); |
1217 | 0 | } else { |
1218 | 0 | LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id=" |
1219 | 0 | << tablet_in_not_shutdown->tablet_id() |
1220 | 0 | << ", mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1221 | 0 | << ", shutdown tablet path=" << tablet->tablet_path(); |
1222 | 0 | return true; |
1223 | 0 | } |
1224 | 0 | } |
1225 | 0 | } |
1226 | | |
1227 | 225 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1228 | 225 | int64_t get_meta_ts = MonotonicMicros(); |
1229 | 225 | Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(), |
1230 | 225 | tablet->schema_hash(), tablet_meta); |
1231 | 225 | if (check_st.ok()) { |
1232 | 225 | if (tablet_meta->tablet_state() != TABLET_SHUTDOWN || |
1233 | 225 | tablet_meta->tablet_uid() != tablet->tablet_uid()) { |
1234 | 0 | LOG(WARNING) << "tablet's state changed to normal, skip remove dirs" |
1235 | 0 | << " tablet id = " << tablet_meta->tablet_id() |
1236 | 0 | << " schema hash = " << tablet_meta->schema_hash() |
1237 | 0 | << " old tablet_uid=" << tablet->tablet_uid() |
1238 | 0 | << " cur tablet_uid=" << tablet_meta->tablet_uid(); |
1239 | 0 | return true; |
1240 | 0 | } |
1241 | | |
1242 | 225 | tablet->clear_cache(); |
1243 | | |
1244 | | // move data to trash |
1245 | 225 | const auto& tablet_path = tablet->tablet_path(); |
1246 | 225 | bool exists = false; |
1247 | 225 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1248 | 225 | if (!exists_st) { |
1249 | 0 | return false; |
1250 | 0 | } |
1251 | 225 | if (exists) { |
1252 | | // take snapshot of tablet meta |
1253 | 225 | auto meta_file_path = fmt::format("{}/{}.hdr", tablet_path, tablet->tablet_id()); |
1254 | 225 | int64_t save_meta_ts = MonotonicMicros(); |
1255 | 225 | auto save_st = tablet->tablet_meta()->save(meta_file_path); |
1256 | 225 | if (!save_st.ok()) { |
1257 | 0 | LOG(WARNING) << "failed to save meta, tablet_id=" << tablet_meta->tablet_id() |
1258 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() |
1259 | 0 | << ", error=" << save_st; |
1260 | 0 | return false; |
1261 | 0 | } |
1262 | 225 | int64_t now = MonotonicMicros(); |
1263 | 225 | LOG(INFO) << "start to move tablet to trash. " << tablet_path |
1264 | 225 | << ". rocksdb get meta cost " << (save_meta_ts - get_meta_ts) |
1265 | 225 | << " us, rocksdb save meta cost " << (now - save_meta_ts) << " us"; |
1266 | 225 | Status rm_st = tablet->data_dir()->move_to_trash(tablet_path); |
1267 | 225 | if (!rm_st.ok()) { |
1268 | 0 | LOG(WARNING) << "fail to move dir to trash. " << tablet_path; |
1269 | 0 | return false; |
1270 | 0 | } |
1271 | 225 | } |
1272 | | // remove tablet meta |
1273 | 225 | auto remove_st = TabletMetaManager::remove(tablet->data_dir(), tablet->tablet_id(), |
1274 | 225 | tablet->schema_hash()); |
1275 | 225 | if (!remove_st.ok()) { |
1276 | 0 | LOG(WARNING) << "failed to remove meta, tablet_id=" << tablet_meta->tablet_id() |
1277 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() << ", error=" << remove_st; |
1278 | 0 | return false; |
1279 | 0 | } |
1280 | 225 | LOG(INFO) << "successfully move tablet to trash. " |
1281 | 225 | << "tablet_id=" << tablet->tablet_id() |
1282 | 225 | << ", schema_hash=" << tablet->schema_hash() << ", tablet_path=" << tablet_path; |
1283 | 225 | return true; |
1284 | 225 | } else { |
1285 | 0 | tablet->clear_cache(); |
1286 | | // if could not find tablet info in meta store, then check if dir existed |
1287 | 0 | const auto& tablet_path = tablet->tablet_path(); |
1288 | 0 | bool exists = false; |
1289 | 0 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1290 | 0 | if (!exists_st) { |
1291 | 0 | return false; |
1292 | 0 | } |
1293 | 0 | if (exists) { |
1294 | 0 | if (check_st.is<META_KEY_NOT_FOUND>()) { |
1295 | 0 | LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path " |
1296 | 0 | << "tablet_id=" << tablet->tablet_id() |
1297 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1298 | 0 | << ", delete tablet_path=" << tablet_path; |
1299 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path)); |
1300 | 0 | RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path)); |
1301 | 0 | return true; |
1302 | 0 | } |
1303 | 0 | LOG(WARNING) << "errors while load meta from store, skip this tablet. " |
1304 | 0 | << "tablet_id=" << tablet->tablet_id() |
1305 | 0 | << ", schema_hash=" << tablet->schema_hash(); |
1306 | 0 | return false; |
1307 | 0 | } else { |
1308 | 0 | LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. " |
1309 | 0 | << "tablet_id=" << tablet->tablet_id() |
1310 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1311 | 0 | << ", tablet_path=" << tablet_path; |
1312 | 0 | return true; |
1313 | 0 | } |
1314 | 0 | } |
1315 | 225 | } |
1316 | | |
1317 | 493 | Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) { |
1318 | 493 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1319 | 493 | std::thread::id thread_id = std::this_thread::get_id(); |
1320 | 493 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1321 | 493 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1322 | 493 | search == shard.tablets_under_transition.end()) { |
1323 | | // not found |
1324 | 491 | shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1); |
1325 | 491 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1326 | 491 | << ", lock times=1, thread_id_in_map=" << thread_id; |
1327 | 491 | return Status::OK(); |
1328 | 491 | } else { |
1329 | | // found |
1330 | 2 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1331 | 2 | if (thread_id != thread_id_in_map) { |
1332 | | // other thread, failed |
1333 | 0 | LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r |
1334 | 0 | << ", thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason |
1335 | 0 | << ", thread_id=" << thread_id; |
1336 | 0 | return Status::InternalError<false>("{} failed try later, tablet_id={}", reason, |
1337 | 0 | tablet_id); |
1338 | 0 | } |
1339 | | // add lock times |
1340 | 2 | ++lock_times; |
1341 | 2 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1342 | 2 | << ", lock times=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
1343 | 2 | return Status::OK(); |
1344 | 2 | } |
1345 | 493 | } |
1346 | | |
1347 | 493 | void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) { |
1348 | 493 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1349 | 493 | std::thread::id thread_id = std::this_thread::get_id(); |
1350 | 493 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1351 | 493 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1352 | 493 | search == shard.tablets_under_transition.end()) { |
1353 | | // impossible, bug |
1354 | 0 | DCHECK(false) << "tablet " << tablet_id |
1355 | 0 | << " must be found, before unreg must have been reg"; |
1356 | 493 | } else { |
1357 | 493 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1358 | 493 | if (thread_id_in_map != thread_id) { |
1359 | | // impossible, bug |
1360 | 0 | DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread"; |
1361 | 0 | } |
1362 | | // sub lock times |
1363 | 493 | --lock_times; |
1364 | 493 | if (lock_times != 0) { |
1365 | 2 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1366 | 2 | << ", left=" << lock_times << ", thread_id_in_map=" << thread_id_in_map; |
1367 | 491 | } else { |
1368 | 491 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1369 | 491 | << ", thread_id_in_map=" << thread_id_in_map; |
1370 | 491 | shard.tablets_under_transition.erase(tablet_id); |
1371 | 491 | } |
1372 | 493 | } |
1373 | 493 | } |
1374 | | |
1375 | | void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, |
1376 | | SchemaHash schema_hash, |
1377 | | const string& schema_hash_path, |
1378 | 10 | int16_t shard_id) { |
1379 | | // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks |
1380 | | // create tablet and load tablet task should check whether the dir exists |
1381 | 10 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1382 | 10 | std::shared_lock rdlock(shard.lock); |
1383 | | |
1384 | | // check if meta already exists |
1385 | 10 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1386 | 10 | Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta); |
1387 | 10 | if (check_st.ok() && tablet_meta->shard_id() == shard_id) { |
1388 | 0 | return; |
1389 | 0 | } |
1390 | | |
1391 | 10 | LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path; |
1392 | | |
1393 | 10 | bool succ = register_transition_tablet(tablet_id, "path gc"); |
1394 | 10 | if (!succ) { |
1395 | 0 | return; |
1396 | 0 | } |
1397 | 10 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }}; |
1398 | | |
1399 | 10 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1400 | 10 | if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) { |
1401 | 0 | LOG(INFO) << "tablet exists, skip delete the path " << schema_hash_path; |
1402 | 0 | return; |
1403 | 0 | } |
1404 | | |
1405 | | // TODO(ygl): may do other checks in the future |
1406 | 10 | bool exists = false; |
1407 | 10 | Status exists_st = io::global_local_filesystem()->exists(schema_hash_path, &exists); |
1408 | 10 | if (exists_st && exists) { |
1409 | 10 | LOG(INFO) << "start to move tablet to trash. tablet_path = " << schema_hash_path; |
1410 | 10 | Status rm_st = data_dir->move_to_trash(schema_hash_path); |
1411 | 10 | if (!rm_st.ok()) { |
1412 | 0 | LOG(WARNING) << "fail to move dir to trash. dir=" << schema_hash_path; |
1413 | 10 | } else { |
1414 | 10 | LOG(INFO) << "move path " << schema_hash_path << " to trash successfully"; |
1415 | 10 | } |
1416 | 10 | } |
1417 | 10 | } |
1418 | | |
1419 | | void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map, |
1420 | 0 | size_t* tablet_count) { |
1421 | 0 | DCHECK(tablet_count); |
1422 | 0 | *tablet_count = 0; |
1423 | 0 | auto filter = [path_map, tablet_count](Tablet* t) -> bool { |
1424 | 0 | ++(*tablet_count); |
1425 | 0 | auto iter = path_map->find(t->data_dir()->path()); |
1426 | 0 | return iter != path_map->end() && iter->second.is_used; |
1427 | 0 | }; |
1428 | |
|
1429 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1430 | 0 | auto& data_dir_info = (*path_map)[tablet->data_dir()->path()]; |
1431 | 0 | data_dir_info.local_used_capacity += tablet->tablet_local_size(); |
1432 | 0 | data_dir_info.remote_used_capacity += tablet->tablet_remote_size(); |
1433 | 0 | }; |
1434 | |
|
1435 | 0 | for_each_tablet(handler, filter); |
1436 | 0 | } |
1437 | | |
1438 | | void TabletManager::get_partition_related_tablets(int64_t partition_id, |
1439 | 0 | std::set<TabletInfo>* tablet_infos) { |
1440 | 0 | std::shared_lock rdlock(_partitions_lock); |
1441 | 0 | auto it = _partitions.find(partition_id); |
1442 | 0 | if (it != _partitions.end()) { |
1443 | 0 | *tablet_infos = it->second.tablets; |
1444 | 0 | } |
1445 | 0 | } |
1446 | | |
1447 | 0 | void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>* partitions_version) { |
1448 | 0 | std::shared_lock rdlock(_partitions_lock); |
1449 | 0 | for (const auto& [partition_id, partition] : _partitions) { |
1450 | 0 | partitions_version->insert( |
1451 | 0 | {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); |
1452 | 0 | } |
1453 | 0 | } |
1454 | | |
1455 | | void TabletManager::update_partitions_visible_version( |
1456 | 0 | const std::map<int64_t, int64_t>& partitions_version) { |
1457 | 0 | std::shared_lock rdlock(_partitions_lock); |
1458 | 0 | for (auto [partition_id, version] : partitions_version) { |
1459 | 0 | auto it = _partitions.find(partition_id); |
1460 | 0 | if (it != _partitions.end()) { |
1461 | 0 | it->second.visible_version->update_version_monoto(version); |
1462 | 0 | } |
1463 | 0 | } |
1464 | 0 | } |
1465 | | |
1466 | 0 | void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { |
1467 | 0 | auto filter = [data_dir](Tablet* tablet) -> bool { |
1468 | 0 | return tablet->tablet_state() == TABLET_RUNNING && |
1469 | 0 | tablet->data_dir()->path_hash() == data_dir->path_hash() && tablet->is_used() && |
1470 | 0 | tablet->init_succeeded(); |
1471 | 0 | }; |
1472 | |
|
1473 | 0 | std::vector<TabletSharedPtr> related_tablets = get_all_tablet(filter); |
1474 | 0 | int counter = 0; |
1475 | 0 | MonotonicStopWatch watch; |
1476 | 0 | watch.start(); |
1477 | 0 | for (TabletSharedPtr tablet : related_tablets) { |
1478 | 0 | if (tablet->do_tablet_meta_checkpoint()) { |
1479 | 0 | ++counter; |
1480 | 0 | } |
1481 | 0 | } |
1482 | 0 | int64_t cost = watch.elapsed_time() / 1000 / 1000; |
1483 | 0 | LOG(INFO) << "finish to do meta checkpoint on dir: " << data_dir->path() |
1484 | 0 | << ", number: " << counter << ", cost(ms): " << cost; |
1485 | 0 | } |
1486 | | |
1487 | | Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, |
1488 | | const bool is_schema_change, |
1489 | | const Tablet* base_tablet, |
1490 | 300 | TabletMetaSharedPtr* tablet_meta) { |
1491 | 300 | uint32_t next_unique_id = 0; |
1492 | 300 | std::unordered_map<uint32_t, uint32_t> col_idx_to_unique_id; |
1493 | 300 | if (!is_schema_change) { |
1494 | 2.15k | for (uint32_t col_idx = 0; col_idx < request.tablet_schema.columns.size(); ++col_idx) { |
1495 | 1.85k | col_idx_to_unique_id[col_idx] = col_idx; |
1496 | 1.85k | } |
1497 | 300 | next_unique_id = request.tablet_schema.columns.size(); |
1498 | 300 | } else { |
1499 | 0 | next_unique_id = base_tablet->next_unique_id(); |
1500 | 0 | auto& new_columns = request.tablet_schema.columns; |
1501 | 0 | for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) { |
1502 | 0 | const TColumn& column = new_columns[new_col_idx]; |
1503 | | // For schema change, compare old_tablet and new_tablet: |
1504 | | // 1. if column exist in both new_tablet and old_tablet, choose the column's |
1505 | | // unique_id in old_tablet to be the column's ordinal number in new_tablet |
1506 | | // 2. if column exists only in new_tablet, assign next_unique_id of old_tablet |
1507 | | // to the new column |
1508 | 0 | int32_t old_col_idx = base_tablet->tablet_schema()->field_index(column.column_name); |
1509 | 0 | if (old_col_idx != -1) { |
1510 | 0 | uint32_t old_unique_id = |
1511 | 0 | base_tablet->tablet_schema()->column(old_col_idx).unique_id(); |
1512 | 0 | col_idx_to_unique_id[new_col_idx] = old_unique_id; |
1513 | 0 | } else { |
1514 | | // Not exist in old tablet, it is a new added column |
1515 | 0 | col_idx_to_unique_id[new_col_idx] = next_unique_id++; |
1516 | 0 | } |
1517 | 0 | } |
1518 | 0 | } |
1519 | 300 | VLOG_NOTICE << "creating tablet meta. next_unique_id=" << next_unique_id; |
1520 | | |
1521 | | // We generate a new tablet_uid for this new tablet. |
1522 | 300 | uint64_t shard_id = store->get_shard(); |
1523 | 300 | *tablet_meta = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id, |
1524 | 300 | col_idx_to_unique_id); |
1525 | 300 | if (request.__isset.storage_format) { |
1526 | 52 | if (request.storage_format == TStorageFormat::DEFAULT) { |
1527 | 0 | (*tablet_meta)->set_preferred_rowset_type(_engine.default_rowset_type()); |
1528 | 52 | } else if (request.storage_format == TStorageFormat::V1) { |
1529 | 0 | (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); |
1530 | 52 | } else if (request.storage_format == TStorageFormat::V2) { |
1531 | 52 | (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); |
1532 | 52 | } else { |
1533 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("invalid TStorageFormat: {}", |
1534 | 0 | request.storage_format); |
1535 | 0 | } |
1536 | 52 | } |
1537 | 300 | return Status::OK(); |
1538 | 300 | } |
1539 | | |
1540 | 3.56k | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { |
1541 | 3.56k | VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id; |
1542 | 3.56k | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
1543 | 3.56k | const auto& iter = tablet_map.find(tablet_id); |
1544 | 3.56k | if (iter != tablet_map.end()) { |
1545 | 3.01k | return iter->second; |
1546 | 3.01k | } |
1547 | 552 | return nullptr; |
1548 | 3.56k | } |
1549 | | |
1550 | 304 | void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { |
1551 | 304 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1552 | 304 | auto& partition = _partitions[tablet->partition_id()]; |
1553 | 304 | partition.tablets.insert(tablet->get_tablet_info()); |
1554 | 304 | tablet->set_visible_version( |
1555 | 304 | std::static_pointer_cast<const VersionWithTime>(partition.visible_version)); |
1556 | 304 | } |
1557 | | |
1558 | 255 | void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { |
1559 | 255 | tablet->set_visible_version(nullptr); |
1560 | 255 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1561 | 255 | auto it = _partitions.find(tablet->partition_id()); |
1562 | 255 | if (it == _partitions.end()) { |
1563 | 0 | return; |
1564 | 0 | } |
1565 | | |
1566 | 255 | auto& tablets = it->second.tablets; |
1567 | 255 | tablets.erase(tablet->get_tablet_info()); |
1568 | 255 | if (tablets.empty()) { |
1569 | 32 | _partitions.erase(it); |
1570 | 32 | } |
1571 | 255 | } |
1572 | | |
1573 | | void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info, |
1574 | 0 | int64_t num) { |
1575 | 0 | for (const auto& tablets_shard : _tablets_shards) { |
1576 | 0 | std::shared_lock rdlock(tablets_shard.lock); |
1577 | 0 | for (const auto& item : tablets_shard.tablet_map) { |
1578 | 0 | TabletSharedPtr tablet = item.second; |
1579 | 0 | if (tablets_info.size() >= num) { |
1580 | 0 | return; |
1581 | 0 | } |
1582 | 0 | if (tablet == nullptr) { |
1583 | 0 | continue; |
1584 | 0 | } |
1585 | 0 | tablets_info.push_back(tablet->get_tablet_info()); |
1586 | 0 | } |
1587 | 0 | } |
1588 | 0 | } |
1589 | | |
1590 | 3.25k | std::shared_mutex& TabletManager::_get_tablets_shard_lock(TTabletId tabletId) { |
1591 | 3.25k | return _get_tablets_shard(tabletId).lock; |
1592 | 3.25k | } |
1593 | | |
1594 | 4.45k | TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) { |
1595 | 4.45k | return _get_tablets_shard(tabletId).tablet_map; |
1596 | 4.45k | } |
1597 | | |
1598 | 8.70k | TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tabletId) { |
1599 | 8.70k | return _tablets_shards[tabletId & _tablets_shards_mask]; |
1600 | 8.70k | } |
1601 | | |
1602 | | void TabletManager::get_tablets_distribution_on_different_disks( |
1603 | | std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk, |
1604 | 0 | std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk) { |
1605 | 0 | std::vector<DataDir*> data_dirs = _engine.get_stores(); |
1606 | 0 | std::map<int64_t, Partition> partitions; |
1607 | 0 | { |
1608 | | // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. |
1609 | | // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and |
1610 | | // copy _partitions here. |
1611 | 0 | std::shared_lock rdlock(_partitions_lock); |
1612 | 0 | partitions = _partitions; |
1613 | 0 | } |
1614 | |
|
1615 | 0 | for (const auto& [partition_id, partition] : partitions) { |
1616 | 0 | std::map<DataDir*, int64_t> tablets_num; |
1617 | 0 | std::map<DataDir*, std::vector<TabletSize>> tablets_info; |
1618 | 0 | for (auto* data_dir : data_dirs) { |
1619 | 0 | tablets_num[data_dir] = 0; |
1620 | 0 | } |
1621 | |
|
1622 | 0 | for (const auto& tablet_info : partition.tablets) { |
1623 | | // get_tablet() will hold 'tablet_shard_lock' |
1624 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); |
1625 | 0 | if (tablet == nullptr) { |
1626 | 0 | continue; |
1627 | 0 | } |
1628 | 0 | DataDir* data_dir = tablet->data_dir(); |
1629 | 0 | size_t tablet_footprint = tablet->tablet_footprint(); |
1630 | 0 | tablets_num[data_dir]++; |
1631 | 0 | TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint); |
1632 | 0 | tablets_info[data_dir].push_back(tablet_size); |
1633 | 0 | } |
1634 | 0 | tablets_num_on_disk[partition_id] = tablets_num; |
1635 | 0 | tablets_info_on_disk[partition_id] = tablets_info; |
1636 | 0 | } |
1637 | 0 | } |
1638 | | |
1639 | | struct SortCtx { |
1640 | | SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t cooldown_timestamp, |
1641 | | int64_t file_size) |
1642 | 0 | : tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {} |
1643 | | TabletSharedPtr tablet; |
1644 | | RowsetSharedPtr rowset; |
1645 | | // to ensure the tablet with -1 would always be greater than other |
1646 | | uint64_t cooldown_timestamp; |
1647 | | int64_t file_size; |
1648 | 0 | bool operator<(const SortCtx& other) const { |
1649 | 0 | if (this->cooldown_timestamp == other.cooldown_timestamp) { |
1650 | 0 | return this->file_size > other.file_size; |
1651 | 0 | } |
1652 | 0 | return this->cooldown_timestamp < other.cooldown_timestamp; |
1653 | 0 | } |
1654 | | }; |
1655 | | |
1656 | | void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets, |
1657 | | std::vector<RowsetSharedPtr>* rowsets, |
1658 | 0 | std::function<bool(const TabletSharedPtr&)> skip_tablet) { |
1659 | 0 | std::vector<SortCtx> sort_ctx_vec; |
1660 | 0 | std::vector<std::weak_ptr<Tablet>> candidates; |
1661 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { candidates.emplace_back(tablet); }, |
1662 | 0 | filter_all_tablets); |
1663 | 0 | auto get_cooldown_tablet = [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) { |
1664 | 0 | const TabletSharedPtr& tablet = t.lock(); |
1665 | 0 | RowsetSharedPtr rowset = nullptr; |
1666 | 0 | if (UNLIKELY(nullptr == tablet)) { |
1667 | 0 | return; |
1668 | 0 | } |
1669 | 0 | int64_t cooldown_timestamp = -1; |
1670 | 0 | size_t file_size = -1; |
1671 | 0 | if (!skip_tablet(tablet) && |
1672 | 0 | (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size))) { |
1673 | 0 | sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp, file_size); |
1674 | 0 | } |
1675 | 0 | }; |
1676 | 0 | std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet); |
1677 | |
|
1678 | 0 | std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end()); |
1679 | |
|
1680 | 0 | for (SortCtx& ctx : sort_ctx_vec) { |
1681 | 0 | VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id(); |
1682 | 0 | tablets->push_back(std::move(ctx.tablet)); |
1683 | 0 | rowsets->push_back(std::move(ctx.rowset)); |
1684 | 0 | } |
1685 | 0 | } |
1686 | | |
1687 | 0 | void TabletManager::get_all_tablets_storage_format(TCheckStorageFormatResult* result) { |
1688 | 0 | DCHECK(result != nullptr); |
1689 | 0 | auto handler = [result](const TabletSharedPtr& tablet) { |
1690 | 0 | if (tablet->all_beta()) { |
1691 | 0 | result->v2_tablets.push_back(tablet->tablet_id()); |
1692 | 0 | } else { |
1693 | 0 | result->v1_tablets.push_back(tablet->tablet_id()); |
1694 | 0 | } |
1695 | 0 | }; |
1696 | |
|
1697 | 0 | for_each_tablet(handler, filter_all_tablets); |
1698 | 0 | result->__isset.v1_tablets = true; |
1699 | 0 | result->__isset.v2_tablets = true; |
1700 | 0 | } |
1701 | | |
1702 | 0 | std::set<int64_t> TabletManager::check_all_tablet_segment(bool repair) { |
1703 | 0 | std::set<int64_t> bad_tablets; |
1704 | 0 | std::map<int64_t, std::vector<int64_t>> repair_shard_bad_tablets; |
1705 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1706 | 0 | if (!tablet->check_all_rowset_segment()) { |
1707 | 0 | int64_t tablet_id = tablet->tablet_id(); |
1708 | 0 | bad_tablets.insert(tablet_id); |
1709 | 0 | if (repair) { |
1710 | 0 | repair_shard_bad_tablets[tablet_id & _tablets_shards_mask].push_back(tablet_id); |
1711 | 0 | } |
1712 | 0 | } |
1713 | 0 | }; |
1714 | 0 | for_each_tablet(handler, filter_all_tablets); |
1715 | |
|
1716 | 0 | for (const auto& [shard_index, shard_tablets] : repair_shard_bad_tablets) { |
1717 | 0 | auto& tablets_shard = _tablets_shards[shard_index]; |
1718 | 0 | auto& tablet_map = tablets_shard.tablet_map; |
1719 | 0 | std::lock_guard<std::shared_mutex> wrlock(tablets_shard.lock); |
1720 | 0 | for (auto tablet_id : shard_tablets) { |
1721 | 0 | auto it = tablet_map.find(tablet_id); |
1722 | 0 | if (it == tablet_map.end()) { |
1723 | 0 | bad_tablets.erase(tablet_id); |
1724 | 0 | LOG(WARNING) << "Bad tablet has be removed. tablet_id=" << tablet_id; |
1725 | 0 | } else { |
1726 | 0 | const auto& tablet = it->second; |
1727 | 0 | static_cast<void>(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
1728 | 0 | tablet->save_meta(); |
1729 | 0 | { |
1730 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock( |
1731 | 0 | _shutdown_tablets_lock); |
1732 | 0 | _shutdown_tablets.push_back(tablet); |
1733 | 0 | } |
1734 | 0 | LOG(WARNING) << "There are some segments lost, set tablet to shutdown state." |
1735 | 0 | << "tablet_id=" << tablet->tablet_id() |
1736 | 0 | << ", tablet_path=" << tablet->tablet_path(); |
1737 | 0 | } |
1738 | 0 | } |
1739 | 0 | } |
1740 | |
|
1741 | 0 | return bad_tablets; |
1742 | 0 | } |
1743 | | |
1744 | | bool TabletManager::update_tablet_partition_id(::doris::TPartitionId partition_id, |
1745 | 0 | ::doris::TTabletId tablet_id) { |
1746 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
1747 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1748 | 0 | if (tablet == nullptr) { |
1749 | 0 | LOG(WARNING) << "get tablet err partition_id: " << partition_id |
1750 | 0 | << " tablet_id:" << tablet_id; |
1751 | 0 | return false; |
1752 | 0 | } |
1753 | 0 | _remove_tablet_from_partition(tablet); |
1754 | 0 | auto st = tablet->tablet_meta()->set_partition_id(partition_id); |
1755 | 0 | if (!st.ok()) { |
1756 | 0 | LOG(WARNING) << "set partition id err partition_id: " << partition_id |
1757 | 0 | << " tablet_id:" << tablet_id; |
1758 | 0 | return false; |
1759 | 0 | } |
1760 | 0 | _add_tablet_to_partition(tablet); |
1761 | 0 | return true; |
1762 | 0 | } |
1763 | | |
1764 | | void TabletManager::get_topn_tablet_delete_bitmap_score( |
1765 | 0 | uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) { |
1766 | 0 | int64_t max_delete_bitmap_score_tablet_id = 0; |
1767 | 0 | int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0; |
1768 | 0 | OlapStopWatch watch; |
1769 | 0 | uint64_t total_delete_map_count = 0; |
1770 | 0 | int n = config::check_tablet_delete_bitmap_score_top_n; |
1771 | 0 | std::vector<std::pair<std::shared_ptr<Tablet>, int64_t>> buf; |
1772 | 0 | buf.reserve(n + 1); |
1773 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1774 | 0 | uint64_t delete_bitmap_count = |
1775 | 0 | tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); |
1776 | 0 | total_delete_map_count += delete_bitmap_count; |
1777 | 0 | if (delete_bitmap_count > *max_delete_bitmap_score) { |
1778 | 0 | max_delete_bitmap_score_tablet_id = tablet->tablet_id(); |
1779 | 0 | *max_delete_bitmap_score = delete_bitmap_count; |
1780 | 0 | } |
1781 | 0 | buf.emplace_back(std::move(tablet), delete_bitmap_count); |
1782 | 0 | std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); |
1783 | 0 | if (buf.size() > n) { |
1784 | 0 | buf.pop_back(); |
1785 | 0 | } |
1786 | 0 | }; |
1787 | 0 | for_each_tablet(handler, filter_all_tablets); |
1788 | 0 | for (auto& [t, _] : buf) { |
1789 | 0 | t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score, |
1790 | 0 | &max_base_rowset_delete_bitmap_score_tablet_id); |
1791 | 0 | } |
1792 | 0 | std::stringstream ss; |
1793 | 0 | for (auto& i : buf) { |
1794 | 0 | ss << i.first->tablet_id() << ":" << i.second << ","; |
1795 | 0 | } |
1796 | 0 | LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n |
1797 | 0 | << ",tablet size=" << _tablets_shards.size() |
1798 | 0 | << ",total_delete_map_count=" << total_delete_map_count |
1799 | 0 | << ",cost(us)=" << watch.get_elapse_time_us() |
1800 | 0 | << ",max_delete_bitmap_score=" << *max_delete_bitmap_score |
1801 | 0 | << ",max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id |
1802 | 0 | << ",max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score |
1803 | 0 | << ",max_base_rowset_delete_bitmap_score_tablet_id=" |
1804 | 0 | << max_base_rowset_delete_bitmap_score_tablet_id << ",tablets=[" << ss.str() << "]"; |
1805 | 0 | } |
1806 | | |
1807 | | } // end namespace doris |