/root/doris/be/src/olap/tablet_manager.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/tablet_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/AgentService_types.h> |
22 | | #include <gen_cpp/BackendService_types.h> |
23 | | #include <gen_cpp/Descriptors_types.h> |
24 | | #include <gen_cpp/MasterService_types.h> |
25 | | #include <gen_cpp/Types_types.h> |
26 | | #include <gen_cpp/olap_file.pb.h> |
27 | | #include <re2/re2.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <list> |
32 | | #include <mutex> |
33 | | #include <ostream> |
34 | | #include <string_view> |
35 | | |
36 | | #include "bvar/bvar.h" |
37 | | #include "common/compiler_util.h" // IWYU pragma: keep |
38 | | #include "common/config.h" |
39 | | #include "common/logging.h" |
40 | | #include "gutil/integral_types.h" |
41 | | #include "gutil/strings/strcat.h" |
42 | | #include "gutil/strings/substitute.h" |
43 | | #include "io/fs/local_file_system.h" |
44 | | #include "olap/cumulative_compaction_time_series_policy.h" |
45 | | #include "olap/data_dir.h" |
46 | | #include "olap/olap_common.h" |
47 | | #include "olap/olap_define.h" |
48 | | #include "olap/olap_meta.h" |
49 | | #include "olap/pb_helper.h" |
50 | | #include "olap/rowset/beta_rowset.h" |
51 | | #include "olap/rowset/rowset.h" |
52 | | #include "olap/rowset/rowset_meta_manager.h" |
53 | | #include "olap/storage_engine.h" |
54 | | #include "olap/tablet.h" |
55 | | #include "olap/tablet_meta.h" |
56 | | #include "olap/tablet_meta_manager.h" |
57 | | #include "olap/tablet_schema.h" |
58 | | #include "olap/txn_manager.h" |
59 | | #include "runtime/exec_env.h" |
60 | | #include "runtime/memory/mem_tracker.h" |
61 | | #include "runtime/thread_context.h" |
62 | | #include "service/backend_options.h" |
63 | | #include "util/defer_op.h" |
64 | | #include "util/doris_metrics.h" |
65 | | #include "util/histogram.h" |
66 | | #include "util/metrics.h" |
67 | | #include "util/path_util.h" |
68 | | #include "util/scoped_cleanup.h" |
69 | | #include "util/stopwatch.hpp" |
70 | | #include "util/time.h" |
71 | | #include "util/trace.h" |
72 | | #include "util/uid_util.h" |
73 | | |
74 | | namespace doris { |
75 | | class CumulativeCompactionPolicy; |
76 | | } // namespace doris |
77 | | |
78 | | using std::map; |
79 | | using std::set; |
80 | | using std::string; |
81 | | using std::vector; |
82 | | |
83 | | namespace doris { |
84 | | using namespace ErrorCode; |
85 | | |
86 | | DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTES, "", |
87 | | mem_consumption, Labels({{"type", "tablet_meta"}})); |
88 | | |
89 | | bvar::Adder<int64_t> g_tablet_meta_schema_columns_count("tablet_meta_schema_columns_count"); |
90 | | |
91 | | TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size) |
92 | | : _engine(engine), |
93 | | _tablet_meta_mem_tracker(std::make_shared<MemTracker>("TabletMeta(experimental)")), |
94 | | _tablets_shards_size(tablet_map_lock_shard_size), |
95 | 150 | _tablets_shards_mask(tablet_map_lock_shard_size - 1) { |
96 | 150 | CHECK_GT(_tablets_shards_size, 0); |
97 | 150 | CHECK_EQ(_tablets_shards_size & _tablets_shards_mask, 0); |
98 | 150 | _tablets_shards.resize(_tablets_shards_size); |
99 | 150 | REGISTER_HOOK_METRIC(tablet_meta_mem_consumption, |
100 | 150 | [this]() { return _tablet_meta_mem_tracker->consumption(); }); |
101 | 150 | } |
102 | | |
103 | 150 | TabletManager::~TabletManager() { |
104 | | #ifndef BE_TEST |
105 | | DEREGISTER_HOOK_METRIC(tablet_meta_mem_consumption); |
106 | | #endif |
107 | 150 | } |
108 | | |
109 | | Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet, |
110 | 93 | bool update_meta, bool force, RuntimeProfile* profile) { |
111 | 93 | if (profile->get_counter("AddTablet") == nullptr) { |
112 | 2 | ADD_TIMER(profile, "AddTablet"); |
113 | 2 | } |
114 | 93 | Status res = Status::OK(); |
115 | 93 | VLOG_NOTICE << "begin to add tablet to TabletManager. " |
116 | 0 | << "tablet_id=" << tablet_id << ", force=" << force; |
117 | | |
118 | 93 | TabletSharedPtr existed_tablet = nullptr; |
119 | 93 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
120 | 93 | const auto& iter = tablet_map.find(tablet_id); |
121 | 93 | if (iter != tablet_map.end()) { |
122 | 2 | existed_tablet = iter->second; |
123 | 2 | } |
124 | | |
125 | 93 | if (existed_tablet == nullptr) { |
126 | 91 | return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/, |
127 | 91 | false /*drop_old*/, profile); |
128 | 91 | } |
129 | | // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets |
130 | | // and then reload the tablet, the tablet's path will the same |
131 | 2 | if (!force) { |
132 | 2 | if (existed_tablet->tablet_path() == tablet->tablet_path()) { |
133 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
134 | 0 | "add the same tablet twice! tablet_id={}, tablet_path={}", tablet_id, |
135 | 0 | tablet->tablet_path()); |
136 | 0 | } |
137 | 2 | if (existed_tablet->data_dir() == tablet->data_dir()) { |
138 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
139 | 0 | "add tablet with same data dir twice! tablet_id={}", tablet_id); |
140 | 0 | } |
141 | 2 | } |
142 | | |
143 | 2 | MonotonicStopWatch watch; |
144 | 2 | watch.start(); |
145 | | |
146 | | // During storage migration, the tablet is moved to another disk, have to check |
147 | | // if the new tablet's rowset version is larger than the old one to prevent losting data during |
148 | | // migration |
149 | 2 | int64_t old_time, new_time; |
150 | 2 | int32_t old_version, new_version; |
151 | 2 | { |
152 | 2 | std::shared_lock rdlock(existed_tablet->get_header_lock()); |
153 | 2 | const RowsetSharedPtr old_rowset = existed_tablet->get_rowset_with_max_version(); |
154 | 2 | const RowsetSharedPtr new_rowset = tablet->get_rowset_with_max_version(); |
155 | | // If new tablet is empty, it is a newly created schema change tablet. |
156 | | // the old tablet is dropped before add tablet. it should not exist old tablet |
157 | 2 | if (new_rowset == nullptr) { |
158 | | // it seems useless to call unlock and return here. |
159 | | // it could prevent error when log level is changed in the future. |
160 | 0 | return Status::Error<ENGINE_INSERT_EXISTS_TABLE>( |
161 | 0 | "new tablet is empty and old tablet exists. it should not happen. tablet_id={}", |
162 | 0 | tablet_id); |
163 | 0 | } |
164 | 2 | old_time = old_rowset == nullptr ? -1 : old_rowset->creation_time(); |
165 | 2 | new_time = new_rowset->creation_time(); |
166 | 2 | old_version = old_rowset == nullptr ? -1 : old_rowset->end_version(); |
167 | 2 | new_version = new_rowset->end_version(); |
168 | 2 | } |
169 | 2 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetExistTabletVersion", "AddTablet"), |
170 | 2 | static_cast<int64_t>(watch.reset())); |
171 | | |
172 | | // In restore process, we replace all origin files in tablet dir with |
173 | | // the downloaded snapshot files. Then we try to reload tablet header. |
174 | | // force == true means we forcibly replace the Tablet in tablet_map |
175 | | // with the new one. But if we do so, the files in the tablet dir will be |
176 | | // dropped when the origin Tablet deconstruct. |
177 | | // So we set keep_files == true to not delete files when the |
178 | | // origin Tablet deconstruct. |
179 | | // During restore process, snapshot loader |
180 | | // replaced the old tablet's rowset with new rowsets, but the tablet path is reused, if drop files |
181 | | // here, the new rowset's file will also be dropped, so use keep files here |
182 | 2 | bool keep_files = force; |
183 | 2 | if (force || |
184 | 2 | (new_version > old_version || (new_version == old_version && new_time >= old_time))) { |
185 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
186 | 2 | res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files, |
187 | 2 | true /*drop_old*/, profile); |
188 | 2 | } else { |
189 | 0 | RETURN_IF_ERROR(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
190 | 0 | tablet->save_meta(); |
191 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
192 | 0 | static_cast<int64_t>(watch.reset())); |
193 | 0 | { |
194 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
195 | 0 | _shutdown_tablets.push_back(tablet); |
196 | 0 | } |
197 | |
|
198 | 0 | res = Status::Error<ENGINE_INSERT_OLD_TABLET>( |
199 | 0 | "set tablet to shutdown state. tablet_id={}, tablet_path={}", tablet->tablet_id(), |
200 | 0 | tablet->tablet_path()); |
201 | 0 | } |
202 | 2 | LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res |
203 | 2 | << ", tablet_id=" << tablet_id << ", old_version=" << old_version |
204 | 2 | << ", new_version=" << new_version << ", old_time=" << old_time |
205 | 2 | << ", new_time=" << new_time |
206 | 2 | << ", old_tablet_path=" << existed_tablet->tablet_path() |
207 | 2 | << ", new_tablet_path=" << tablet->tablet_path(); |
208 | | |
209 | 2 | return res; |
210 | 2 | } |
211 | | |
212 | | Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, |
213 | | const TabletSharedPtr& tablet, bool update_meta, |
214 | | bool keep_files, bool drop_old, |
215 | 93 | RuntimeProfile* profile) { |
216 | | // check if new tablet's meta is in store and add new tablet's meta to meta store |
217 | 93 | Status res = Status::OK(); |
218 | 93 | MonotonicStopWatch watch; |
219 | 93 | watch.start(); |
220 | 93 | if (update_meta) { |
221 | | // call tablet save meta in order to valid the meta |
222 | 93 | tablet->save_meta(); |
223 | 93 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "SaveMeta", "AddTablet"), |
224 | 93 | static_cast<int64_t>(watch.reset())); |
225 | 93 | } |
226 | 93 | if (drop_old) { |
227 | | // If the new tablet is fresher than the existing one, then replace |
228 | | // the existing tablet with the new one. |
229 | | // Use default replica_id to ignore whether replica_id is match when drop tablet. |
230 | 2 | Status status = _drop_tablet(tablet_id, /* replica_id */ 0, keep_files, false, true); |
231 | 2 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropOldTablet", "AddTablet"), |
232 | 2 | static_cast<int64_t>(watch.reset())); |
233 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN( |
234 | 2 | status, strings::Substitute("failed to drop old tablet when add new tablet. " |
235 | 2 | "tablet_id=$0", |
236 | 2 | tablet_id)); |
237 | 2 | } |
238 | | // Register tablet into DataDir, so that we can manage tablet from |
239 | | // the perspective of root path. |
240 | | // Example: unregister all tables when a bad disk found. |
241 | 93 | tablet->register_tablet_into_dir(); |
242 | 93 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
243 | 93 | tablet_map[tablet_id] = tablet; |
244 | 93 | _add_tablet_to_partition(tablet); |
245 | | // TODO: remove multiply 2 of tablet meta mem size |
246 | | // Because table schema will copy in tablet, there will be double mem cost |
247 | | // so here multiply 2 |
248 | 93 | _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2); |
249 | 93 | g_tablet_meta_schema_columns_count << tablet->tablet_meta()->tablet_columns_num(); |
250 | 93 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RegisterTabletInfo", "AddTablet"), |
251 | 93 | static_cast<int64_t>(watch.reset())); |
252 | | |
253 | 93 | VLOG_NOTICE << "add tablet to map successfully." |
254 | 0 | << " tablet_id=" << tablet_id; |
255 | | |
256 | 93 | return res; |
257 | 93 | } |
258 | | |
259 | 0 | bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) { |
260 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
261 | 0 | return _check_tablet_id_exist_unlocked(tablet_id); |
262 | 0 | } |
263 | | |
264 | 0 | bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { |
265 | 0 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
266 | 0 | return tablet_map.find(tablet_id) != tablet_map.end(); |
267 | 0 | } |
268 | | |
269 | | Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores, |
270 | 94 | RuntimeProfile* profile) { |
271 | 94 | DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
272 | | |
273 | 94 | int64_t tablet_id = request.tablet_id; |
274 | 94 | LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id |
275 | 94 | << ", table_id=" << request.table_id << ", partition_id=" << request.partition_id |
276 | 94 | << ", replica_id=" << request.replica_id << ", stores.size=" << stores.size() |
277 | 94 | << ", first store=" << stores[0]->path(); |
278 | | |
279 | | // when we create rollup tablet A(assume on shard-1) from tablet B(assume on shard-2) |
280 | | // we need use write lock on shard-1 and then use read lock on shard-2 |
281 | | // if there have create rollup tablet C(assume on shard-2) from tablet D(assume on shard-1) at the same time, we will meet deadlock |
282 | 94 | std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock); |
283 | 94 | bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
284 | 94 | bool is_schema_change_or_atomic_restore = |
285 | 94 | request.__isset.base_tablet_id && request.base_tablet_id > 0; |
286 | 94 | bool need_two_lock = |
287 | 94 | is_schema_change_or_atomic_restore && |
288 | 94 | ((_tablets_shards_mask & request.base_tablet_id) != (_tablets_shards_mask & tablet_id)); |
289 | 94 | if (need_two_lock) { |
290 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock")); |
291 | 0 | two_tablet_lock.lock(); |
292 | 0 | } |
293 | | |
294 | 94 | MonotonicStopWatch shard_lock_watch; |
295 | 94 | shard_lock_watch.start(); |
296 | 94 | std::lock_guard wrlock(_get_tablets_shard_lock(tablet_id)); |
297 | 94 | shard_lock_watch.stop(); |
298 | 94 | COUNTER_UPDATE(ADD_TIMER(profile, "GetShardLock"), |
299 | 94 | static_cast<int64_t>(shard_lock_watch.elapsed_time())); |
300 | | // Make create_tablet operation to be idempotent: |
301 | | // 1. Return true if tablet with same tablet_id and schema_hash exist; |
302 | | // false if tablet with same tablet_id but different schema_hash exist. |
303 | | // 2. When this is an alter task, if the tablet(both tablet_id and schema_hash are |
304 | | // same) already exist, then just return true(an duplicate request). But if |
305 | | // tablet_id exist but with different schema_hash, return an error(report task will |
306 | | // eventually trigger its deletion). |
307 | 94 | { |
308 | 94 | SCOPED_TIMER(ADD_TIMER(profile, "GetTabletUnlocked")); |
309 | 94 | if (_get_tablet_unlocked(tablet_id) != nullptr) { |
310 | 3 | LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id; |
311 | 3 | return Status::OK(); |
312 | 3 | } |
313 | 94 | } |
314 | | |
315 | 91 | TabletSharedPtr base_tablet = nullptr; |
316 | | // If the CreateTabletReq has base_tablet_id then it is a alter-tablet request |
317 | 91 | if (is_schema_change_or_atomic_restore) { |
318 | | // if base_tablet_id's lock diffrent with new_tablet_id, we need lock it. |
319 | 0 | if (need_two_lock) { |
320 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet")); |
321 | 0 | base_tablet = get_tablet(request.base_tablet_id); |
322 | 0 | two_tablet_lock.unlock(); |
323 | 0 | } else { |
324 | 0 | SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTabletUnlocked")); |
325 | 0 | base_tablet = _get_tablet_unlocked(request.base_tablet_id); |
326 | 0 | } |
327 | 0 | if (base_tablet == nullptr) { |
328 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
329 | 0 | return Status::Error<TABLE_CREATE_META_ERROR>( |
330 | 0 | "fail to create tablet(change schema/atomic restore), base tablet does not " |
331 | 0 | "exist. new_tablet_id={}, base_tablet_id={}", |
332 | 0 | tablet_id, request.base_tablet_id); |
333 | 0 | } |
334 | | // If we are doing schema-change or atomic-restore, we should use the same data dir |
335 | | // TODO(lingbin): A litter trick here, the directory should be determined before |
336 | | // entering this method |
337 | | // |
338 | | // ATTN: Since all restored replicas will be saved to HDD, so no storage_medium check here. |
339 | 0 | if (in_restore_mode || |
340 | 0 | request.storage_medium == base_tablet->data_dir()->storage_medium()) { |
341 | 0 | LOG(INFO) << "create tablet use the base tablet data dir. tablet_id=" << tablet_id |
342 | 0 | << ", base tablet_id=" << request.base_tablet_id |
343 | 0 | << ", data dir=" << base_tablet->data_dir()->path(); |
344 | 0 | stores.clear(); |
345 | 0 | stores.push_back(base_tablet->data_dir()); |
346 | 0 | } |
347 | 0 | } |
348 | | |
349 | | // set alter type to schema-change. it is useless |
350 | 91 | TabletSharedPtr tablet = _internal_create_tablet_unlocked( |
351 | 91 | request, is_schema_change_or_atomic_restore, base_tablet.get(), stores, profile); |
352 | 91 | if (tablet == nullptr) { |
353 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
354 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet. tablet_id={}", |
355 | 0 | request.tablet_id); |
356 | 0 | } |
357 | | |
358 | 91 | LOG(INFO) << "success to create tablet. tablet_id=" << tablet_id |
359 | 91 | << ", tablet_path=" << tablet->tablet_path(); |
360 | 91 | return Status::OK(); |
361 | 91 | } |
362 | | |
363 | | TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( |
364 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
365 | 91 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
366 | | // If in schema-change state, base_tablet must also be provided. |
367 | | // i.e., is_schema_change and base_tablet are either assigned or not assigned |
368 | 91 | DCHECK((is_schema_change && base_tablet) || (!is_schema_change && !base_tablet)); |
369 | | |
370 | | // NOTE: The existence of tablet_id and schema_hash has already been checked, |
371 | | // no need check again here. |
372 | | |
373 | 91 | const std::string parent_timer_name = "InternalCreateTablet"; |
374 | 91 | SCOPED_TIMER(ADD_TIMER(profile, parent_timer_name)); |
375 | | |
376 | 91 | MonotonicStopWatch watch; |
377 | 91 | watch.start(); |
378 | 91 | auto create_meta_timer = ADD_CHILD_TIMER(profile, "CreateMeta", parent_timer_name); |
379 | 91 | auto tablet = _create_tablet_meta_and_dir_unlocked(request, is_schema_change, base_tablet, |
380 | 91 | data_dirs, profile); |
381 | 91 | COUNTER_UPDATE(create_meta_timer, static_cast<int64_t>(watch.reset())); |
382 | 91 | if (tablet == nullptr) { |
383 | 0 | return nullptr; |
384 | 0 | } |
385 | | |
386 | 91 | int64_t new_tablet_id = request.tablet_id; |
387 | 91 | int32_t new_schema_hash = request.tablet_schema.schema_hash; |
388 | | |
389 | | // should remove the tablet's pending_id no matter create-tablet success or not |
390 | 91 | DataDir* data_dir = tablet->data_dir(); |
391 | | |
392 | | // TODO(yiguolei) |
393 | | // the following code is very difficult to understand because it mixed alter tablet v2 |
394 | | // and alter tablet v1 should remove alter tablet v1 code after v0.12 |
395 | 91 | Status res = Status::OK(); |
396 | 91 | bool is_tablet_added = false; |
397 | 91 | do { |
398 | 91 | res = tablet->init(); |
399 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "TabletInit", parent_timer_name), |
400 | 91 | static_cast<int64_t>(watch.reset())); |
401 | 91 | if (!res.ok()) { |
402 | 0 | LOG(WARNING) << "tablet init failed. tablet:" << tablet->tablet_id(); |
403 | 0 | break; |
404 | 0 | } |
405 | | |
406 | | // Create init version if this is not a restore mode replica and request.version is set |
407 | | // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; |
408 | | // if (!in_restore_mode && request.__isset.version) { |
409 | | // create initial rowset before add it to storage engine could omit many locks |
410 | 91 | res = tablet->create_initial_rowset(request.version); |
411 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "InitRowset", parent_timer_name), |
412 | 91 | static_cast<int64_t>(watch.reset())); |
413 | 91 | if (!res.ok()) { |
414 | 0 | LOG(WARNING) << "fail to create initial version for tablet. res=" << res; |
415 | 0 | break; |
416 | 0 | } |
417 | | |
418 | 91 | if (is_schema_change) { |
419 | | // if this is a new alter tablet, has to set its state to not ready |
420 | | // because schema change handler depends on it to check whether history data |
421 | | // convert finished |
422 | 0 | static_cast<void>(tablet->set_tablet_state(TabletState::TABLET_NOTREADY)); |
423 | 0 | } |
424 | | // Add tablet to StorageEngine will make it visible to user |
425 | | // Will persist tablet meta |
426 | 91 | auto add_tablet_timer = ADD_CHILD_TIMER(profile, "AddTablet", parent_timer_name); |
427 | 91 | res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false, profile); |
428 | 91 | COUNTER_UPDATE(add_tablet_timer, static_cast<int64_t>(watch.reset())); |
429 | 91 | if (!res.ok()) { |
430 | 0 | LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; |
431 | 0 | break; |
432 | 0 | } |
433 | 91 | is_tablet_added = true; |
434 | | |
435 | | // TODO(lingbin): The following logic seems useless, can be removed? |
436 | | // Because if _add_tablet_unlocked() return OK, we must can get it from map. |
437 | 91 | TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id); |
438 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "GetTablet", parent_timer_name), |
439 | 91 | static_cast<int64_t>(watch.reset())); |
440 | 91 | if (tablet_ptr == nullptr) { |
441 | 0 | res = Status::Error<TABLE_NOT_FOUND>("fail to get tablet. res={}", res); |
442 | 0 | break; |
443 | 0 | } |
444 | 91 | } while (false); |
445 | | |
446 | 91 | if (res.ok()) { |
447 | 91 | return tablet; |
448 | 91 | } |
449 | | // something is wrong, we need clear environment |
450 | 0 | if (is_tablet_added) { |
451 | 0 | Status status = _drop_tablet(new_tablet_id, request.replica_id, false, false, true); |
452 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "DropTablet", parent_timer_name), |
453 | 0 | static_cast<int64_t>(watch.reset())); |
454 | 0 | if (!status.ok()) { |
455 | 0 | LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; |
456 | 0 | } |
457 | 0 | } else { |
458 | 0 | tablet->delete_all_files(); |
459 | 0 | static_cast<void>(TabletMetaManager::remove(data_dir, new_tablet_id, new_schema_hash)); |
460 | 0 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemoveTabletFiles", parent_timer_name), |
461 | 0 | static_cast<int64_t>(watch.reset())); |
462 | 0 | } |
463 | 0 | return nullptr; |
464 | 91 | } |
465 | | |
466 | 91 | static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t tablet_id) { |
467 | 91 | string path = dir; |
468 | 91 | path = path_util::join_path_segments(path, DATA_PREFIX); |
469 | 91 | path = path_util::join_path_segments(path, std::to_string(shard_id)); |
470 | 91 | path = path_util::join_path_segments(path, std::to_string(tablet_id)); |
471 | 91 | return path; |
472 | 91 | } |
473 | | |
474 | | TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( |
475 | | const TCreateTabletReq& request, const bool is_schema_change, const Tablet* base_tablet, |
476 | 91 | const std::vector<DataDir*>& data_dirs, RuntimeProfile* profile) { |
477 | 91 | string pending_id = StrCat(TABLET_ID_PREFIX, request.tablet_id); |
478 | | // Many attempts are made here in the hope that even if a disk fails, it can still continue. |
479 | 91 | std::string parent_timer_name = "CreateMeta"; |
480 | 91 | MonotonicStopWatch watch; |
481 | 91 | watch.start(); |
482 | 91 | for (auto& data_dir : data_dirs) { |
483 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "RemovePendingIds", parent_timer_name), |
484 | 91 | static_cast<int64_t>(watch.reset())); |
485 | | |
486 | 91 | TabletMetaSharedPtr tablet_meta; |
487 | | // if create meta failed, do not need to clean dir, because it is only in memory |
488 | 91 | Status res = _create_tablet_meta_unlocked(request, data_dir, is_schema_change, base_tablet, |
489 | 91 | &tablet_meta); |
490 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateMetaUnlock", parent_timer_name), |
491 | 91 | static_cast<int64_t>(watch.reset())); |
492 | 91 | if (!res.ok()) { |
493 | 0 | LOG(WARNING) << "fail to create tablet meta. res=" << res |
494 | 0 | << ", root=" << data_dir->path(); |
495 | 0 | continue; |
496 | 0 | } |
497 | | |
498 | 91 | string tablet_dir = |
499 | 91 | _gen_tablet_dir(data_dir->path(), tablet_meta->shard_id(), request.tablet_id); |
500 | 91 | string schema_hash_dir = path_util::join_path_segments( |
501 | 91 | tablet_dir, std::to_string(request.tablet_schema.schema_hash)); |
502 | | |
503 | | // Because the tablet is removed asynchronously, so that the dir may still exist when BE |
504 | | // receive create-tablet request again, For example retried schema-change request |
505 | 91 | bool exists = true; |
506 | 91 | res = io::global_local_filesystem()->exists(schema_hash_dir, &exists); |
507 | 91 | if (!res.ok()) { |
508 | 0 | continue; |
509 | 0 | } |
510 | 91 | if (exists) { |
511 | 0 | LOG(WARNING) << "skip this dir because tablet path exist, path=" << schema_hash_dir; |
512 | 0 | continue; |
513 | 91 | } else { |
514 | 91 | Status st = io::global_local_filesystem()->create_directory(schema_hash_dir); |
515 | 91 | if (!st.ok()) { |
516 | 0 | continue; |
517 | 0 | } |
518 | 91 | } |
519 | | |
520 | 91 | if (tablet_meta->partition_id() <= 0) { |
521 | 26 | LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << ", tablet " |
522 | 26 | << tablet_meta->tablet_id(); |
523 | 26 | } |
524 | 91 | TabletSharedPtr new_tablet = |
525 | 91 | std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
526 | 91 | COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateTabletFromMeta", parent_timer_name), |
527 | 91 | static_cast<int64_t>(watch.reset())); |
528 | 91 | return new_tablet; |
529 | 91 | } |
530 | 0 | return nullptr; |
531 | 91 | } |
532 | | |
533 | | Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, |
534 | 45 | bool is_drop_table_or_partition) { |
535 | 45 | return _drop_tablet(tablet_id, replica_id, false, is_drop_table_or_partition, false); |
536 | 45 | } |
537 | | |
538 | | // Drop specified tablet. |
539 | | Status TabletManager::_drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files, |
540 | 47 | bool is_drop_table_or_partition, bool had_held_shard_lock) { |
541 | 47 | LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id |
542 | 47 | << ", is_drop_table_or_partition=" << is_drop_table_or_partition; |
543 | 47 | DorisMetrics::instance()->drop_tablet_requests_total->increment(1); |
544 | | |
545 | 47 | RETURN_IF_ERROR(register_transition_tablet(tablet_id, "drop tablet")); |
546 | 47 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "drop tablet"); }}; |
547 | | |
548 | | // Fetch tablet which need to be dropped |
549 | 47 | TabletSharedPtr to_drop_tablet; |
550 | 47 | { |
551 | 47 | std::unique_lock<std::shared_mutex> wlock(_get_tablets_shard_lock(tablet_id), |
552 | 47 | std::defer_lock); |
553 | 47 | if (!had_held_shard_lock) { |
554 | 45 | wlock.lock(); |
555 | 45 | } |
556 | 47 | to_drop_tablet = _get_tablet_unlocked(tablet_id); |
557 | 47 | if (to_drop_tablet == nullptr) { |
558 | 1 | LOG(WARNING) << "fail to drop tablet because it does not exist. " |
559 | 1 | << "tablet_id=" << tablet_id; |
560 | 1 | return Status::OK(); |
561 | 1 | } |
562 | | |
563 | | // We should compare replica id to avoid dropping new cloned tablet. |
564 | | // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. |
565 | 46 | if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { |
566 | 0 | return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), |
567 | 0 | replica_id); |
568 | 0 | } |
569 | | |
570 | 46 | _remove_tablet_from_partition(to_drop_tablet); |
571 | 46 | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
572 | 46 | tablet_map.erase(tablet_id); |
573 | 46 | } |
574 | | |
575 | 0 | to_drop_tablet->clear_cache(); |
576 | | |
577 | 46 | if (!keep_files) { |
578 | | // drop tablet will update tablet meta, should lock |
579 | 46 | std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock()); |
580 | 46 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); |
581 | 46 | LOG(INFO) << "set tablet to shutdown state and remove it from memory. " |
582 | 46 | << "tablet_id=" << tablet_id << ", tablet_path=" << to_drop_tablet->tablet_path(); |
583 | | // NOTE: has to update tablet here, but must not update tablet meta directly. |
584 | | // because other thread may hold the tablet object, they may save meta too. |
585 | | // If update meta directly here, other thread may override the meta |
586 | | // and the tablet will be loaded at restart time. |
587 | | // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. |
588 | 46 | RETURN_IF_ERROR(to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN)); |
589 | | // We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta, |
590 | | // otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost. |
591 | 46 | if (is_drop_table_or_partition) { |
592 | 0 | RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets()); |
593 | 0 | } |
594 | 46 | to_drop_tablet->save_meta(); |
595 | 46 | { |
596 | 46 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
597 | 46 | _shutdown_tablets.push_back(to_drop_tablet); |
598 | 46 | } |
599 | 46 | } |
600 | | |
601 | 46 | to_drop_tablet->deregister_tablet_from_dir(); |
602 | 46 | _tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() * 2); |
603 | 46 | g_tablet_meta_schema_columns_count << -to_drop_tablet->tablet_meta()->tablet_columns_num(); |
604 | 46 | return Status::OK(); |
605 | 46 | } |
606 | | |
607 | 2.26k | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) { |
608 | 2.26k | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
609 | 2.26k | return _get_tablet_unlocked(tablet_id, include_deleted, err); |
610 | 2.26k | } |
611 | | |
612 | 0 | std::vector<TabletSharedPtr> TabletManager::get_all_tablet(std::function<bool(Tablet*)>&& filter) { |
613 | 0 | std::vector<TabletSharedPtr> res; |
614 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { res.emplace_back(tablet); }, |
615 | 0 | std::move(filter)); |
616 | 0 | return res; |
617 | 0 | } |
618 | | |
619 | | void TabletManager::for_each_tablet(std::function<void(const TabletSharedPtr&)>&& handler, |
620 | 8 | std::function<bool(Tablet*)>&& filter) { |
621 | 8 | std::vector<TabletSharedPtr> tablets; |
622 | 8 | for (const auto& tablets_shard : _tablets_shards) { |
623 | 8 | tablets.clear(); |
624 | 8 | { |
625 | 8 | std::shared_lock rdlock(tablets_shard.lock); |
626 | 52 | for (const auto& [id, tablet] : tablets_shard.tablet_map) { |
627 | 52 | if (filter(tablet.get())) { |
628 | 52 | tablets.emplace_back(tablet); |
629 | 52 | } |
630 | 52 | } |
631 | 8 | } |
632 | 52 | for (const auto& tablet : tablets) { |
633 | 52 | handler(tablet); |
634 | 52 | } |
635 | 8 | } |
636 | 8 | } |
637 | | |
638 | | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, |
639 | 2.26k | string* err) { |
640 | 2.26k | TabletSharedPtr tablet; |
641 | 2.26k | tablet = _get_tablet_unlocked(tablet_id); |
642 | 2.26k | if (tablet == nullptr && include_deleted) { |
643 | 4 | std::shared_lock rdlock(_shutdown_tablets_lock); |
644 | 4 | for (auto& deleted_tablet : _shutdown_tablets) { |
645 | 2 | CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr"; |
646 | 2 | if (deleted_tablet->tablet_id() == tablet_id) { |
647 | 2 | tablet = deleted_tablet; |
648 | 2 | break; |
649 | 2 | } |
650 | 2 | } |
651 | 4 | } |
652 | | |
653 | 2.26k | if (tablet == nullptr) { |
654 | 37 | if (err != nullptr) { |
655 | 1 | *err = "tablet does not exist. " + BackendOptions::get_localhost(); |
656 | 1 | } |
657 | 37 | return nullptr; |
658 | 37 | } |
659 | | #ifndef BE_TEST |
660 | | if (!tablet->is_used()) { |
661 | | LOG(WARNING) << "tablet cannot be used. tablet=" << tablet_id; |
662 | | if (err != nullptr) { |
663 | | *err = "tablet cannot be used. " + BackendOptions::get_localhost(); |
664 | | } |
665 | | return nullptr; |
666 | | } |
667 | | #endif |
668 | | |
669 | 2.23k | return tablet; |
670 | 2.26k | } |
671 | | |
672 | | TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, TabletUid tablet_uid, |
673 | 0 | bool include_deleted, string* err) { |
674 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
675 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, include_deleted, err); |
676 | 0 | if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) { |
677 | 0 | return tablet; |
678 | 0 | } |
679 | 0 | return nullptr; |
680 | 0 | } |
681 | | |
682 | 0 | uint64_t TabletManager::get_rowset_nums() { |
683 | 0 | uint64_t rowset_nums = 0; |
684 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { rowset_nums += tablet->version_count(); }, |
685 | 0 | filter_all_tablets); |
686 | 0 | return rowset_nums; |
687 | 0 | } |
688 | | |
689 | 0 | uint64_t TabletManager::get_segment_nums() { |
690 | 0 | uint64_t segment_nums = 0; |
691 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { segment_nums += tablet->segment_count(); }, |
692 | 0 | filter_all_tablets); |
693 | 0 | return segment_nums; |
694 | 0 | } |
695 | | |
696 | | bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, |
697 | | TTabletId* tablet_id, |
698 | 37 | TSchemaHash* schema_hash) { |
699 | | // the path like: /data/14/10080/964828783/ |
700 | 37 | static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); |
701 | | // match tablet schema hash data path, for example, the path is /data/1/16791/29998 |
702 | | // 1 is shard id , 16791 is tablet id, 29998 is schema hash |
703 | 37 | if (RE2::PartialMatch(path, normal_re, tablet_id, schema_hash)) { |
704 | 33 | return true; |
705 | 33 | } |
706 | | |
707 | | // If we can't match normal path pattern, this may be a path which is a empty tablet |
708 | | // directory. Use this pattern to match empty tablet directory. In this case schema_hash |
709 | | // will be set to zero. |
710 | 4 | static re2::RE2 empty_tablet_re("/data/\\d+/(\\d+)($|/$)"); |
711 | 4 | if (!RE2::PartialMatch(path, empty_tablet_re, tablet_id)) { |
712 | 2 | return false; |
713 | 2 | } |
714 | 2 | *schema_hash = 0; |
715 | 2 | return true; |
716 | 4 | } |
717 | | |
718 | 3 | bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { |
719 | | // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat |
720 | 3 | static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); |
721 | 3 | string id_str; |
722 | 3 | bool ret = RE2::PartialMatch(path, re, &id_str); |
723 | 3 | if (ret) { |
724 | 1 | rowset_id->init(id_str); |
725 | 1 | return true; |
726 | 1 | } |
727 | 2 | return false; |
728 | 3 | } |
729 | | |
730 | 0 | void TabletManager::get_tablet_stat(TTabletStatResult* result) { |
731 | 0 | std::shared_ptr<std::vector<TTabletStat>> local_cache; |
732 | 0 | { |
733 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
734 | 0 | local_cache = _tablet_stat_list_cache; |
735 | 0 | } |
736 | 0 | result->__set_tablet_stat_list(*local_cache); |
737 | 0 | } |
738 | | |
739 | | std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( |
740 | | CompactionType compaction_type, DataDir* data_dir, |
741 | | const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, |
742 | | const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& |
743 | 3 | all_cumulative_compaction_policies) { |
744 | 3 | int64_t now_ms = UnixMillis(); |
745 | 3 | const string& compaction_type_str = |
746 | 3 | compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; |
747 | 3 | uint32_t highest_score = 0; |
748 | | // find the single compaction tablet |
749 | 3 | uint32_t single_compact_highest_score = 0; |
750 | 3 | TabletSharedPtr best_tablet; |
751 | 3 | TabletSharedPtr best_single_compact_tablet; |
752 | 51 | auto handler = [&](const TabletSharedPtr& tablet_ptr) { |
753 | 51 | if (tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
754 | 0 | LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id() |
755 | 0 | << " will be ignored by automatic compaction tasks since it's " |
756 | 0 | << "set to disabled automatic compaction."; |
757 | 0 | return; |
758 | 0 | } |
759 | | |
760 | 51 | if (config::enable_skip_tablet_compaction && |
761 | 51 | tablet_ptr->should_skip_compaction(compaction_type, UnixSeconds())) { |
762 | 0 | return; |
763 | 0 | } |
764 | 51 | if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
765 | 0 | return; |
766 | 0 | } |
767 | | |
768 | 51 | auto search = tablet_submitted_compaction.find(tablet_ptr); |
769 | 51 | if (search != tablet_submitted_compaction.end()) { |
770 | 0 | return; |
771 | 0 | } |
772 | | |
773 | 51 | int64_t last_failure_ms = tablet_ptr->last_cumu_compaction_failure_time(); |
774 | 51 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
775 | 0 | last_failure_ms = tablet_ptr->last_base_compaction_failure_time(); |
776 | 0 | } |
777 | 51 | if (now_ms - last_failure_ms <= 5000) { |
778 | 0 | VLOG_DEBUG << "Too often to check compaction, skip it. " |
779 | 0 | << "compaction_type=" << compaction_type_str |
780 | 0 | << ", last_failure_time_ms=" << last_failure_ms |
781 | 0 | << ", tablet_id=" << tablet_ptr->tablet_id(); |
782 | 0 | return; |
783 | 0 | } |
784 | | |
785 | 51 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
786 | 0 | std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(), |
787 | 0 | std::try_to_lock); |
788 | 0 | if (!lock.owns_lock()) { |
789 | 0 | LOG(INFO) << "can not get base lock: " << tablet_ptr->tablet_id(); |
790 | 0 | return; |
791 | 0 | } |
792 | 51 | } else { |
793 | 51 | std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(), |
794 | 51 | std::try_to_lock); |
795 | 51 | if (!lock.owns_lock()) { |
796 | 0 | LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id(); |
797 | 0 | return; |
798 | 0 | } |
799 | 51 | } |
800 | 51 | auto cumulative_compaction_policy = all_cumulative_compaction_policies.at( |
801 | 51 | tablet_ptr->tablet_meta()->compaction_policy()); |
802 | 51 | uint32_t current_compaction_score = tablet_ptr->calc_compaction_score(); |
803 | 51 | if (current_compaction_score < 5) { |
804 | 0 | tablet_ptr->set_skip_compaction(true, compaction_type, UnixSeconds()); |
805 | 0 | } |
806 | | |
807 | | // tablet should do single compaction |
808 | 51 | if (current_compaction_score > single_compact_highest_score && |
809 | 51 | tablet_ptr->should_fetch_from_peer()) { |
810 | 2 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
811 | 2 | cumulative_compaction_policy); |
812 | 2 | if (ret) { |
813 | 2 | single_compact_highest_score = current_compaction_score; |
814 | 2 | best_single_compact_tablet = tablet_ptr; |
815 | 2 | } |
816 | 2 | } |
817 | | |
818 | | // tablet should do cumu or base compaction |
819 | 51 | if (current_compaction_score > highest_score && !tablet_ptr->should_fetch_from_peer()) { |
820 | 3 | bool ret = tablet_ptr->suitable_for_compaction(compaction_type, |
821 | 3 | cumulative_compaction_policy); |
822 | 3 | if (ret) { |
823 | 3 | highest_score = current_compaction_score; |
824 | 3 | best_tablet = tablet_ptr; |
825 | 3 | } |
826 | 3 | } |
827 | 51 | }; |
828 | | |
829 | 3 | for_each_tablet(handler, filter_all_tablets); |
830 | 3 | std::vector<TabletSharedPtr> picked_tablet; |
831 | 3 | if (best_tablet != nullptr) { |
832 | 3 | VLOG_CRITICAL << "Found the best tablet for compaction. " |
833 | 0 | << "compaction_type=" << compaction_type_str |
834 | 0 | << ", tablet_id=" << best_tablet->tablet_id() << ", path=" << data_dir->path() |
835 | 0 | << ", highest_score=" << highest_score |
836 | 0 | << ", fetch from peer: " << best_tablet->should_fetch_from_peer(); |
837 | 3 | picked_tablet.emplace_back(std::move(best_tablet)); |
838 | 3 | } |
839 | | |
840 | | // pick single compaction tablet needs the highest score |
841 | 3 | if (best_single_compact_tablet != nullptr && single_compact_highest_score >= highest_score) { |
842 | 1 | VLOG_CRITICAL << "Found the best tablet for single compaction. " |
843 | 0 | << "compaction_type=" << compaction_type_str |
844 | 0 | << ", tablet_id=" << best_single_compact_tablet->tablet_id() |
845 | 0 | << ", path=" << data_dir->path() |
846 | 0 | << ", highest_score=" << single_compact_highest_score << ", fetch from peer: " |
847 | 0 | << best_single_compact_tablet->should_fetch_from_peer(); |
848 | 1 | picked_tablet.emplace_back(std::move(best_single_compact_tablet)); |
849 | 1 | } |
850 | 3 | *score = highest_score > single_compact_highest_score ? highest_score |
851 | 3 | : single_compact_highest_score; |
852 | 3 | return picked_tablet; |
853 | 3 | } |
854 | | |
855 | | Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id, |
856 | | TSchemaHash schema_hash, std::string_view meta_binary, |
857 | | bool update_meta, bool force, bool restore, |
858 | 2 | bool check_path) { |
859 | 2 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
860 | 2 | Status status = tablet_meta->deserialize(meta_binary); |
861 | 2 | if (!status.ok()) { |
862 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
863 | 0 | "fail to load tablet because can not parse meta_binary string. tablet_id={}, " |
864 | 0 | "schema_hash={}, path={}, status={}", |
865 | 0 | tablet_id, schema_hash, data_dir->path(), status); |
866 | 0 | } |
867 | | |
868 | | // check if tablet meta is valid |
869 | 2 | if (tablet_meta->tablet_id() != tablet_id || tablet_meta->schema_hash() != schema_hash) { |
870 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
871 | 0 | "fail to load tablet because meet invalid tablet meta. trying to load " |
872 | 0 | "tablet(tablet_id={}, schema_hash={}), but meet tablet={}, path={}", |
873 | 0 | tablet_id, schema_hash, tablet_meta->tablet_id(), data_dir->path()); |
874 | 0 | } |
875 | 2 | if (tablet_meta->tablet_uid().hi == 0 && tablet_meta->tablet_uid().lo == 0) { |
876 | 0 | return Status::Error<HEADER_PB_PARSE_FAILED>( |
877 | 0 | "fail to load tablet because its uid == 0. tablet={}, path={}", |
878 | 0 | tablet_meta->tablet_id(), data_dir->path()); |
879 | 0 | } |
880 | | |
881 | 2 | if (restore) { |
882 | | // we're restoring tablet from trash, tablet state should be changed from shutdown back to running |
883 | 0 | tablet_meta->set_tablet_state(TABLET_RUNNING); |
884 | 0 | } |
885 | | |
886 | 2 | if (tablet_meta->partition_id() == 0) { |
887 | 0 | LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0"; |
888 | 0 | } |
889 | | |
890 | 2 | TabletSharedPtr tablet = std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir); |
891 | | |
892 | | // NOTE: method load_tablet_from_meta could be called by two cases as below |
893 | | // case 1: BE start; |
894 | | // case 2: Clone Task/Restore |
895 | | // For case 1 doesn't need path check because BE is just starting and not ready, |
896 | | // just check tablet meta status to judge whether tablet is delete is enough. |
897 | | // For case 2, If a tablet has just been copied to local BE, |
898 | | // it may be cleared by gc-thread(see perform_tablet_gc) because the tablet meta may not be loaded to memory. |
899 | | // So clone task should check path and then failed and retry in this case. |
900 | 2 | if (check_path) { |
901 | 2 | bool exists = true; |
902 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(tablet->tablet_path(), &exists)); |
903 | 2 | if (!exists) { |
904 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
905 | 0 | "tablet path not exists, create tablet failed, path={}", tablet->tablet_path()); |
906 | 0 | } |
907 | 2 | } |
908 | | |
909 | 2 | if (tablet->tablet_meta()->tablet_state() == TABLET_SHUTDOWN) { |
910 | 0 | { |
911 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock); |
912 | 0 | _shutdown_tablets.push_back(tablet); |
913 | 0 | } |
914 | 0 | return Status::Error<TABLE_ALREADY_DELETED_ERROR>( |
915 | 0 | "fail to load tablet because it is to be deleted. tablet_id={}, schema_hash={}, " |
916 | 0 | "path={}", |
917 | 0 | tablet_id, schema_hash, data_dir->path()); |
918 | 0 | } |
919 | | // NOTE: We do not check tablet's initial version here, because if BE restarts when |
920 | | // one tablet is doing schema-change, we may meet empty tablet. |
921 | 2 | if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) { |
922 | | // tablet state is invalid, drop tablet |
923 | 0 | return Status::Error<TABLE_INDEX_VALIDATE_ERROR>( |
924 | 0 | "fail to load tablet. it is in running state but without delta. tablet={}, path={}", |
925 | 0 | tablet->tablet_id(), data_dir->path()); |
926 | 0 | } |
927 | | |
928 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN( |
929 | 2 | tablet->init(), |
930 | 2 | strings::Substitute("tablet init failed. tablet=$0", tablet->tablet_id())); |
931 | | |
932 | 2 | RuntimeProfile profile("CreateTablet"); |
933 | 2 | std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id)); |
934 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN( |
935 | 2 | _add_tablet_unlocked(tablet_id, tablet, update_meta, force, &profile), |
936 | 2 | strings::Substitute("fail to add tablet. tablet=$0", tablet->tablet_id())); |
937 | | |
938 | 2 | return Status::OK(); |
939 | 2 | } |
940 | | |
941 | | Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, |
942 | | SchemaHash schema_hash, const string& schema_hash_path, |
943 | 2 | bool force, bool restore) { |
944 | 2 | LOG(INFO) << "begin to load tablet from dir. " |
945 | 2 | << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash |
946 | 2 | << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; |
947 | | // not add lock here, because load_tablet_from_meta already add lock |
948 | 2 | std::string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id); |
949 | | // should change shard id before load tablet |
950 | 2 | std::string shard_path = |
951 | 2 | path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path))); |
952 | 2 | std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1); |
953 | 2 | int32_t shard = stol(shard_str); |
954 | | |
955 | 2 | bool exists = false; |
956 | 2 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(header_path, &exists)); |
957 | 2 | if (!exists) { |
958 | 0 | return Status::Error<NOT_FOUND>("fail to find header file. [header_path={}]", header_path); |
959 | 0 | } |
960 | | |
961 | 2 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
962 | 2 | if (!tablet_meta->create_from_file(header_path).ok()) { |
963 | 0 | return Status::Error<ENGINE_LOAD_INDEX_TABLE_ERROR>( |
964 | 0 | "fail to load tablet_meta. file_path={}", header_path); |
965 | 0 | } |
966 | 2 | TabletUid tablet_uid = TabletUid::gen_uid(); |
967 | | |
968 | | // remove rowset binlog metas |
969 | 2 | auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", schema_hash_path); |
970 | 2 | bool binlog_metas_file_exists = false; |
971 | 2 | auto file_exists_status = |
972 | 2 | io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); |
973 | 2 | if (!file_exists_status.ok()) { |
974 | 0 | return file_exists_status; |
975 | 0 | } |
976 | 2 | bool contain_binlog = false; |
977 | 2 | RowsetBinlogMetasPB rowset_binlog_metas_pb; |
978 | 2 | if (binlog_metas_file_exists) { |
979 | 0 | auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); |
980 | 0 | if (binlog_meta_filesize > 0) { |
981 | 0 | contain_binlog = true; |
982 | 0 | RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); |
983 | 0 | VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; |
984 | 0 | } |
985 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); |
986 | 0 | } |
987 | 2 | if (contain_binlog) { |
988 | 0 | auto binlog_dir = fmt::format("{}/_binlog", schema_hash_path); |
989 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); |
990 | | |
991 | 0 | std::vector<io::FileInfo> files; |
992 | 0 | RETURN_IF_ERROR( |
993 | 0 | io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); |
994 | 0 | for (auto& file : files) { |
995 | 0 | auto& filename = file.file_name; |
996 | 0 | std::string new_suffix; |
997 | 0 | std::string old_suffix; |
998 | |
|
999 | 0 | if (filename.ends_with(".binlog")) { |
1000 | 0 | old_suffix = ".binlog"; |
1001 | 0 | new_suffix = ".dat"; |
1002 | 0 | } else if (filename.ends_with(".binlog-index")) { |
1003 | 0 | old_suffix = ".binlog-index"; |
1004 | 0 | new_suffix = ".idx"; |
1005 | 0 | } else { |
1006 | 0 | continue; |
1007 | 0 | } |
1008 | | |
1009 | 0 | std::string new_filename = filename; |
1010 | 0 | new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(), |
1011 | 0 | new_suffix); |
1012 | 0 | auto from = fmt::format("{}/{}", schema_hash_path, filename); |
1013 | 0 | auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); |
1014 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); |
1015 | 0 | } |
1016 | | |
1017 | 0 | auto* meta = store->get_meta(); |
1018 | | // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas |
1019 | 0 | RETURN_IF_ERROR( |
1020 | 0 | RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); |
1021 | 0 | } |
1022 | | |
1023 | | // has to change shard id here, because meta file maybe copied from other source |
1024 | | // its shard is different from local shard |
1025 | 2 | tablet_meta->set_shard_id(shard); |
1026 | | // load dir is called by clone, restore, storage migration |
1027 | | // should change tablet uid when tablet object changed |
1028 | 2 | tablet_meta->set_tablet_uid(std::move(tablet_uid)); |
1029 | 2 | std::string meta_binary; |
1030 | 2 | tablet_meta->serialize(&meta_binary); |
1031 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN( |
1032 | 2 | load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force, restore, |
1033 | 2 | true), |
1034 | 2 | strings::Substitute("fail to load tablet. header_path=$0", header_path)); |
1035 | | |
1036 | 2 | return Status::OK(); |
1037 | 2 | } |
1038 | | |
1039 | 0 | Status TabletManager::report_tablet_info(TTabletInfo* tablet_info) { |
1040 | 0 | LOG(INFO) << "begin to process report tablet info." |
1041 | 0 | << "tablet_id=" << tablet_info->tablet_id; |
1042 | |
|
1043 | 0 | Status res = Status::OK(); |
1044 | |
|
1045 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id); |
1046 | 0 | if (tablet == nullptr) { |
1047 | 0 | return Status::Error<TABLE_NOT_FOUND>("can't find tablet={}", tablet_info->tablet_id); |
1048 | 0 | } |
1049 | | |
1050 | 0 | tablet->build_tablet_report_info(tablet_info); |
1051 | 0 | VLOG_TRACE << "success to process report tablet info."; |
1052 | 0 | return res; |
1053 | 0 | } |
1054 | | |
1055 | 0 | void TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info) { |
1056 | 0 | DCHECK(tablets_info != nullptr); |
1057 | 0 | VLOG_NOTICE << "begin to build all report tablets info"; |
1058 | | |
1059 | | // build the expired txn map first, outside the tablet map lock |
1060 | 0 | std::map<TabletInfo, std::vector<int64_t>> expire_txn_map; |
1061 | 0 | _engine.txn_manager()->build_expire_txn_map(&expire_txn_map); |
1062 | 0 | LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; |
1063 | |
|
1064 | 0 | HistogramStat tablet_version_num_hist; |
1065 | 0 | auto local_cache = std::make_shared<std::vector<TTabletStat>>(); |
1066 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1067 | 0 | auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; |
1068 | 0 | TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back(); |
1069 | 0 | tablet->build_tablet_report_info(&tablet_info, true, true); |
1070 | | // find expired transaction corresponding to this tablet |
1071 | 0 | TabletInfo tinfo(tablet->tablet_id(), tablet->tablet_uid()); |
1072 | 0 | auto find = expire_txn_map.find(tinfo); |
1073 | 0 | if (find != expire_txn_map.end()) { |
1074 | 0 | tablet_info.__set_transaction_ids(find->second); |
1075 | 0 | expire_txn_map.erase(find); |
1076 | 0 | } |
1077 | 0 | tablet_version_num_hist.add(tablet_info.total_version_count); |
1078 | 0 | auto& t_tablet_stat = local_cache->emplace_back(); |
1079 | 0 | t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); |
1080 | 0 | t_tablet_stat.__set_data_size(tablet_info.data_size); |
1081 | 0 | t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); |
1082 | 0 | t_tablet_stat.__set_row_count(tablet_info.row_count); |
1083 | 0 | t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); |
1084 | 0 | t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); |
1085 | 0 | t_tablet_stat.__set_visible_version(tablet_info.version); |
1086 | 0 | }; |
1087 | 0 | for_each_tablet(handler, filter_all_tablets); |
1088 | |
|
1089 | 0 | { |
1090 | 0 | std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex); |
1091 | 0 | _tablet_stat_list_cache.swap(local_cache); |
1092 | 0 | } |
1093 | 0 | DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( |
1094 | 0 | tablet_version_num_hist); |
1095 | 0 | LOG(INFO) << "success to build all report tablets info. tablet_count=" << tablets_info->size(); |
1096 | 0 | } |
1097 | | |
1098 | 5 | Status TabletManager::start_trash_sweep() { |
1099 | 5 | DBUG_EXECUTE_IF("TabletManager.start_trash_sweep.sleep", DBUG_BLOCK); |
1100 | 5 | std::unique_lock<std::mutex> lock(_gc_tablets_lock, std::defer_lock); |
1101 | 5 | if (!lock.try_lock()) { |
1102 | 0 | return Status::OK(); |
1103 | 0 | } |
1104 | | |
1105 | 5 | for_each_tablet([](const TabletSharedPtr& tablet) { tablet->delete_expired_stale_rowset(); }, |
1106 | 5 | filter_all_tablets); |
1107 | | |
1108 | 5 | std::list<TabletSharedPtr>::iterator last_it; |
1109 | 5 | { |
1110 | 5 | std::shared_lock rdlock(_shutdown_tablets_lock); |
1111 | 5 | last_it = _shutdown_tablets.begin(); |
1112 | 5 | if (last_it == _shutdown_tablets.end()) { |
1113 | 0 | return Status::OK(); |
1114 | 0 | } |
1115 | 5 | } |
1116 | | |
1117 | 9 | auto get_batch_tablets = [this, &last_it](int limit) { |
1118 | 9 | std::vector<TabletSharedPtr> batch_tablets; |
1119 | 9 | std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); |
1120 | 33 | while (last_it != _shutdown_tablets.end() && batch_tablets.size() < limit) { |
1121 | | // it means current tablet is referenced by other thread |
1122 | 24 | if (last_it->use_count() > 1) { |
1123 | 1 | last_it++; |
1124 | 23 | } else { |
1125 | 23 | batch_tablets.push_back(*last_it); |
1126 | 23 | last_it = _shutdown_tablets.erase(last_it); |
1127 | 23 | } |
1128 | 24 | } |
1129 | | |
1130 | 9 | return batch_tablets; |
1131 | 9 | }; |
1132 | | |
1133 | 5 | std::list<TabletSharedPtr> failed_tablets; |
1134 | | // return true if need continue delete |
1135 | 5 | auto delete_one_batch = [this, get_batch_tablets, &failed_tablets]() -> bool { |
1136 | 5 | int limit = 200; |
1137 | 9 | for (;;) { |
1138 | 9 | auto batch_tablets = get_batch_tablets(limit); |
1139 | 23 | for (const auto& tablet : batch_tablets) { |
1140 | 23 | if (_move_tablet_to_trash(tablet)) { |
1141 | 23 | limit--; |
1142 | 23 | } else { |
1143 | 0 | failed_tablets.push_back(tablet); |
1144 | 0 | } |
1145 | 23 | } |
1146 | 9 | if (limit <= 0) { |
1147 | 0 | return true; |
1148 | 0 | } |
1149 | 9 | if (batch_tablets.empty()) { |
1150 | 5 | return false; |
1151 | 5 | } |
1152 | 9 | } |
1153 | | |
1154 | 0 | return false; |
1155 | 5 | }; |
1156 | | |
1157 | 5 | while (delete_one_batch()) { |
1158 | | #ifndef BE_TEST |
1159 | | sleep(1); |
1160 | | #endif |
1161 | 0 | } |
1162 | | |
1163 | 5 | if (!failed_tablets.empty()) { |
1164 | 0 | std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock); |
1165 | 0 | _shutdown_tablets.splice(_shutdown_tablets.end(), failed_tablets); |
1166 | 0 | } |
1167 | | |
1168 | 5 | return Status::OK(); |
1169 | 5 | } |
1170 | | |
1171 | 23 | bool TabletManager::_move_tablet_to_trash(const TabletSharedPtr& tablet) { |
1172 | 23 | RETURN_IF_ERROR(register_transition_tablet(tablet->tablet_id(), "move to trash")); |
1173 | 23 | Defer defer {[&]() { unregister_transition_tablet(tablet->tablet_id(), "move to trash"); }}; |
1174 | | |
1175 | 23 | TabletSharedPtr tablet_in_not_shutdown = get_tablet(tablet->tablet_id()); |
1176 | 23 | if (tablet_in_not_shutdown) { |
1177 | 0 | TSchemaHash schema_hash_not_shutdown = tablet_in_not_shutdown->schema_hash(); |
1178 | 0 | size_t path_hash_not_shutdown = tablet_in_not_shutdown->data_dir()->path_hash(); |
1179 | 0 | if (tablet->schema_hash() == schema_hash_not_shutdown && |
1180 | 0 | tablet->data_dir()->path_hash() == path_hash_not_shutdown) { |
1181 | 0 | tablet->clear_cache(); |
1182 | | // shard_id in memory not eq shard_id in shutdown |
1183 | 0 | if (tablet_in_not_shutdown->tablet_path() != tablet->tablet_path()) { |
1184 | 0 | LOG(INFO) << "tablet path not eq shutdown tablet path, move it to trash, tablet_id=" |
1185 | 0 | << tablet_in_not_shutdown->tablet_id() |
1186 | 0 | << " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1187 | 0 | << " shutdown tablet path=" << tablet->tablet_path(); |
1188 | 0 | return tablet->data_dir()->move_to_trash(tablet->tablet_path()); |
1189 | 0 | } else { |
1190 | 0 | LOG(INFO) << "tablet path eq shutdown tablet path, not move to trash, tablet_id=" |
1191 | 0 | << tablet_in_not_shutdown->tablet_id() |
1192 | 0 | << " mem manager tablet path=" << tablet_in_not_shutdown->tablet_path() |
1193 | 0 | << " shutdown tablet path=" << tablet->tablet_path(); |
1194 | 0 | return true; |
1195 | 0 | } |
1196 | 0 | } |
1197 | 0 | } |
1198 | | |
1199 | 23 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1200 | 23 | int64_t get_meta_ts = MonotonicMicros(); |
1201 | 23 | Status check_st = TabletMetaManager::get_meta(tablet->data_dir(), tablet->tablet_id(), |
1202 | 23 | tablet->schema_hash(), tablet_meta); |
1203 | 23 | if (check_st.ok()) { |
1204 | 23 | if (tablet_meta->tablet_state() != TABLET_SHUTDOWN || |
1205 | 23 | tablet_meta->tablet_uid() != tablet->tablet_uid()) { |
1206 | 0 | LOG(WARNING) << "tablet's state changed to normal, skip remove dirs" |
1207 | 0 | << " tablet id = " << tablet_meta->tablet_id() |
1208 | 0 | << " schema hash = " << tablet_meta->schema_hash() |
1209 | 0 | << " old tablet_uid=" << tablet->tablet_uid() |
1210 | 0 | << " cur tablet_uid=" << tablet_meta->tablet_uid(); |
1211 | 0 | return true; |
1212 | 0 | } |
1213 | | |
1214 | 23 | tablet->clear_cache(); |
1215 | | |
1216 | | // move data to trash |
1217 | 23 | const auto& tablet_path = tablet->tablet_path(); |
1218 | 23 | bool exists = false; |
1219 | 23 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1220 | 23 | if (!exists_st) { |
1221 | 0 | return false; |
1222 | 0 | } |
1223 | 23 | if (exists) { |
1224 | | // take snapshot of tablet meta |
1225 | 23 | auto meta_file_path = fmt::format("{}/{}.hdr", tablet_path, tablet->tablet_id()); |
1226 | 23 | int64_t save_meta_ts = MonotonicMicros(); |
1227 | 23 | auto save_st = tablet->tablet_meta()->save(meta_file_path); |
1228 | 23 | if (!save_st.ok()) { |
1229 | 0 | LOG(WARNING) << "failed to save meta, tablet_id=" << tablet_meta->tablet_id() |
1230 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() |
1231 | 0 | << ", error=" << save_st; |
1232 | 0 | return false; |
1233 | 0 | } |
1234 | 23 | int64_t now = MonotonicMicros(); |
1235 | 23 | LOG(INFO) << "start to move tablet to trash. " << tablet_path |
1236 | 23 | << ". rocksdb get meta cost " << (save_meta_ts - get_meta_ts) |
1237 | 23 | << " us, rocksdb save meta cost " << (now - save_meta_ts) << " us"; |
1238 | 23 | Status rm_st = tablet->data_dir()->move_to_trash(tablet_path); |
1239 | 23 | if (!rm_st.ok()) { |
1240 | 0 | LOG(WARNING) << "fail to move dir to trash. " << tablet_path; |
1241 | 0 | return false; |
1242 | 0 | } |
1243 | 23 | } |
1244 | | // remove tablet meta |
1245 | 23 | auto remove_st = TabletMetaManager::remove(tablet->data_dir(), tablet->tablet_id(), |
1246 | 23 | tablet->schema_hash()); |
1247 | 23 | if (!remove_st.ok()) { |
1248 | 0 | LOG(WARNING) << "failed to remove meta, tablet_id=" << tablet_meta->tablet_id() |
1249 | 0 | << ", tablet_uid=" << tablet_meta->tablet_uid() << ", error=" << remove_st; |
1250 | 0 | return false; |
1251 | 0 | } |
1252 | 23 | LOG(INFO) << "successfully move tablet to trash. " |
1253 | 23 | << "tablet_id=" << tablet->tablet_id() |
1254 | 23 | << ", schema_hash=" << tablet->schema_hash() << ", tablet_path=" << tablet_path; |
1255 | 23 | return true; |
1256 | 23 | } else { |
1257 | 0 | tablet->clear_cache(); |
1258 | | // if could not find tablet info in meta store, then check if dir existed |
1259 | 0 | const auto& tablet_path = tablet->tablet_path(); |
1260 | 0 | bool exists = false; |
1261 | 0 | Status exists_st = io::global_local_filesystem()->exists(tablet_path, &exists); |
1262 | 0 | if (!exists_st) { |
1263 | 0 | return false; |
1264 | 0 | } |
1265 | 0 | if (exists) { |
1266 | 0 | if (check_st.is<META_KEY_NOT_FOUND>()) { |
1267 | 0 | LOG(INFO) << "could not find tablet meta in rocksdb, so just delete it path " |
1268 | 0 | << "tablet_id=" << tablet->tablet_id() |
1269 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1270 | 0 | << ", delete tablet_path=" << tablet_path; |
1271 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tablet_path)); |
1272 | 0 | RETURN_IF_ERROR(DataDir::delete_tablet_parent_path_if_empty(tablet_path)); |
1273 | 0 | return true; |
1274 | 0 | } |
1275 | 0 | LOG(WARNING) << "errors while load meta from store, skip this tablet. " |
1276 | 0 | << "tablet_id=" << tablet->tablet_id() |
1277 | 0 | << ", schema_hash=" << tablet->schema_hash(); |
1278 | 0 | return false; |
1279 | 0 | } else { |
1280 | 0 | LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. " |
1281 | 0 | << "tablet_id=" << tablet->tablet_id() |
1282 | 0 | << ", schema_hash=" << tablet->schema_hash() |
1283 | 0 | << ", tablet_path=" << tablet_path; |
1284 | 0 | return true; |
1285 | 0 | } |
1286 | 0 | } |
1287 | 23 | } |
1288 | | |
1289 | 82 | Status TabletManager::register_transition_tablet(int64_t tablet_id, std::string reason) { |
1290 | 82 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1291 | 82 | std::thread::id thread_id = std::this_thread::get_id(); |
1292 | 82 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1293 | 82 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1294 | 82 | search == shard.tablets_under_transition.end()) { |
1295 | | // not found |
1296 | 80 | shard.tablets_under_transition[tablet_id] = std::make_tuple(reason, thread_id, 1); |
1297 | 80 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1298 | 80 | << " lock times=1 thread_id_in_map=" << thread_id; |
1299 | 80 | return Status::OK(); |
1300 | 80 | } else { |
1301 | | // found |
1302 | 2 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1303 | 2 | if (thread_id != thread_id_in_map) { |
1304 | | // other thread, failed |
1305 | 0 | LOG(INFO) << "tablet_id = " << tablet_id << " is doing " << r |
1306 | 0 | << " thread_id_in_map=" << thread_id_in_map << " , add reason=" << reason |
1307 | 0 | << " thread_id=" << thread_id; |
1308 | 0 | return Status::InternalError<false>("{} failed try later, tablet_id={}", reason, |
1309 | 0 | tablet_id); |
1310 | 0 | } |
1311 | | // add lock times |
1312 | 2 | ++lock_times; |
1313 | 2 | LOG(INFO) << "add tablet_id= " << tablet_id << " to map, reason=" << reason |
1314 | 2 | << " lock times=" << lock_times << " thread_id_in_map=" << thread_id_in_map; |
1315 | 2 | return Status::OK(); |
1316 | 2 | } |
1317 | 82 | } |
1318 | | |
1319 | 82 | void TabletManager::unregister_transition_tablet(int64_t tablet_id, std::string reason) { |
1320 | 82 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1321 | 82 | std::thread::id thread_id = std::this_thread::get_id(); |
1322 | 82 | std::lock_guard<std::mutex> lk(shard.lock_for_transition); |
1323 | 82 | if (auto search = shard.tablets_under_transition.find(tablet_id); |
1324 | 82 | search == shard.tablets_under_transition.end()) { |
1325 | | // impossible, bug |
1326 | 0 | DCHECK(false) << "tablet " << tablet_id |
1327 | 0 | << " must be found, before unreg must have been reg"; |
1328 | 82 | } else { |
1329 | 82 | auto& [r, thread_id_in_map, lock_times] = search->second; |
1330 | 82 | if (thread_id_in_map != thread_id) { |
1331 | | // impossible, bug |
1332 | 0 | DCHECK(false) << "tablet " << tablet_id << " unreg thread must same reg thread"; |
1333 | 0 | } |
1334 | | // sub lock times |
1335 | 82 | --lock_times; |
1336 | 82 | if (lock_times != 0) { |
1337 | 2 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1338 | 2 | << " left=" << lock_times << " thread_id_in_map=" << thread_id_in_map; |
1339 | 80 | } else { |
1340 | 80 | LOG(INFO) << "erase tablet_id= " << tablet_id << " from map, reason=" << reason |
1341 | 80 | << " thread_id_in_map=" << thread_id_in_map; |
1342 | 80 | shard.tablets_under_transition.erase(tablet_id); |
1343 | 80 | } |
1344 | 82 | } |
1345 | 82 | } |
1346 | | |
1347 | | void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, |
1348 | | SchemaHash schema_hash, |
1349 | | const string& schema_hash_path, |
1350 | 10 | int16_t shard_id) { |
1351 | | // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks |
1352 | | // create tablet and load tablet task should check whether the dir exists |
1353 | 10 | tablets_shard& shard = _get_tablets_shard(tablet_id); |
1354 | 10 | std::shared_lock rdlock(shard.lock); |
1355 | | |
1356 | | // check if meta already exists |
1357 | 10 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
1358 | 10 | Status check_st = TabletMetaManager::get_meta(data_dir, tablet_id, schema_hash, tablet_meta); |
1359 | 10 | if (check_st.ok() && tablet_meta->shard_id() == shard_id) { |
1360 | 0 | return; |
1361 | 0 | } |
1362 | | |
1363 | 10 | LOG(INFO) << "tablet meta not exists, try delete tablet path " << schema_hash_path; |
1364 | | |
1365 | 10 | bool succ = register_transition_tablet(tablet_id, "path gc"); |
1366 | 10 | if (!succ) { |
1367 | 0 | return; |
1368 | 0 | } |
1369 | 10 | Defer defer {[&]() { unregister_transition_tablet(tablet_id, "path gc"); }}; |
1370 | | |
1371 | 10 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1372 | 10 | if (tablet != nullptr && tablet->tablet_path() == schema_hash_path) { |
1373 | 0 | LOG(INFO) << "tablet exists, skip delete the path " << schema_hash_path; |
1374 | 0 | return; |
1375 | 0 | } |
1376 | | |
1377 | | // TODO(ygl): may do other checks in the future |
1378 | 10 | bool exists = false; |
1379 | 10 | Status exists_st = io::global_local_filesystem()->exists(schema_hash_path, &exists); |
1380 | 10 | if (exists_st && exists) { |
1381 | 10 | LOG(INFO) << "start to move tablet to trash. tablet_path = " << schema_hash_path; |
1382 | 10 | Status rm_st = data_dir->move_to_trash(schema_hash_path); |
1383 | 10 | if (!rm_st.ok()) { |
1384 | 0 | LOG(WARNING) << "fail to move dir to trash. dir=" << schema_hash_path; |
1385 | 10 | } else { |
1386 | 10 | LOG(INFO) << "move path " << schema_hash_path << " to trash successfully"; |
1387 | 10 | } |
1388 | 10 | } |
1389 | 10 | } |
1390 | | |
1391 | | void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map, |
1392 | 0 | size_t* tablet_count) { |
1393 | 0 | DCHECK(tablet_count); |
1394 | 0 | *tablet_count = 0; |
1395 | 0 | auto filter = [path_map, tablet_count](Tablet* t) -> bool { |
1396 | 0 | ++(*tablet_count); |
1397 | 0 | auto iter = path_map->find(t->data_dir()->path()); |
1398 | 0 | return iter != path_map->end() && iter->second.is_used; |
1399 | 0 | }; |
1400 | |
|
1401 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1402 | 0 | auto& data_dir_info = (*path_map)[tablet->data_dir()->path()]; |
1403 | 0 | data_dir_info.local_used_capacity += tablet->tablet_local_size(); |
1404 | 0 | data_dir_info.remote_used_capacity += tablet->tablet_remote_size(); |
1405 | 0 | }; |
1406 | |
|
1407 | 0 | for_each_tablet(handler, filter); |
1408 | 0 | } |
1409 | | |
1410 | | void TabletManager::get_partition_related_tablets(int64_t partition_id, |
1411 | 0 | std::set<TabletInfo>* tablet_infos) { |
1412 | 0 | std::shared_lock rdlock(_partitions_lock); |
1413 | 0 | auto it = _partitions.find(partition_id); |
1414 | 0 | if (it != _partitions.end()) { |
1415 | 0 | *tablet_infos = it->second.tablets; |
1416 | 0 | } |
1417 | 0 | } |
1418 | | |
1419 | 0 | void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>* partitions_version) { |
1420 | 0 | std::shared_lock rdlock(_partitions_lock); |
1421 | 0 | for (const auto& [partition_id, partition] : _partitions) { |
1422 | 0 | partitions_version->insert( |
1423 | 0 | {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); |
1424 | 0 | } |
1425 | 0 | } |
1426 | | |
1427 | | void TabletManager::update_partitions_visible_version( |
1428 | 0 | const std::map<int64_t, int64_t>& partitions_version) { |
1429 | 0 | std::shared_lock rdlock(_partitions_lock); |
1430 | 0 | for (auto [partition_id, version] : partitions_version) { |
1431 | 0 | auto it = _partitions.find(partition_id); |
1432 | 0 | if (it != _partitions.end()) { |
1433 | 0 | it->second.visible_version->update_version_monoto(version); |
1434 | 0 | } |
1435 | 0 | } |
1436 | 0 | } |
1437 | | |
1438 | 0 | void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { |
1439 | 0 | auto filter = [data_dir](Tablet* tablet) -> bool { |
1440 | 0 | return tablet->tablet_state() == TABLET_RUNNING && |
1441 | 0 | tablet->data_dir()->path_hash() == data_dir->path_hash() && tablet->is_used() && |
1442 | 0 | tablet->init_succeeded(); |
1443 | 0 | }; |
1444 | |
|
1445 | 0 | std::vector<TabletSharedPtr> related_tablets = get_all_tablet(filter); |
1446 | 0 | int counter = 0; |
1447 | 0 | MonotonicStopWatch watch; |
1448 | 0 | watch.start(); |
1449 | 0 | for (TabletSharedPtr tablet : related_tablets) { |
1450 | 0 | if (tablet->do_tablet_meta_checkpoint()) { |
1451 | 0 | ++counter; |
1452 | 0 | } |
1453 | 0 | } |
1454 | 0 | int64_t cost = watch.elapsed_time() / 1000 / 1000; |
1455 | 0 | LOG(INFO) << "finish to do meta checkpoint on dir: " << data_dir->path() |
1456 | 0 | << ", number: " << counter << ", cost(ms): " << cost; |
1457 | 0 | } |
1458 | | |
1459 | | Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, |
1460 | | const bool is_schema_change, |
1461 | | const Tablet* base_tablet, |
1462 | 91 | TabletMetaSharedPtr* tablet_meta) { |
1463 | 91 | uint32_t next_unique_id = 0; |
1464 | 91 | std::unordered_map<uint32_t, uint32_t> col_idx_to_unique_id; |
1465 | 91 | if (!is_schema_change) { |
1466 | 1.32k | for (uint32_t col_idx = 0; col_idx < request.tablet_schema.columns.size(); ++col_idx) { |
1467 | 1.23k | col_idx_to_unique_id[col_idx] = col_idx; |
1468 | 1.23k | } |
1469 | 91 | next_unique_id = request.tablet_schema.columns.size(); |
1470 | 91 | } else { |
1471 | 0 | next_unique_id = base_tablet->next_unique_id(); |
1472 | 0 | auto& new_columns = request.tablet_schema.columns; |
1473 | 0 | for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) { |
1474 | 0 | const TColumn& column = new_columns[new_col_idx]; |
1475 | | // For schema change, compare old_tablet and new_tablet: |
1476 | | // 1. if column exist in both new_tablet and old_tablet, choose the column's |
1477 | | // unique_id in old_tablet to be the column's ordinal number in new_tablet |
1478 | | // 2. if column exists only in new_tablet, assign next_unique_id of old_tablet |
1479 | | // to the new column |
1480 | 0 | int32_t old_col_idx = base_tablet->tablet_schema()->field_index(column.column_name); |
1481 | 0 | if (old_col_idx != -1) { |
1482 | 0 | uint32_t old_unique_id = |
1483 | 0 | base_tablet->tablet_schema()->column(old_col_idx).unique_id(); |
1484 | 0 | col_idx_to_unique_id[new_col_idx] = old_unique_id; |
1485 | 0 | } else { |
1486 | | // Not exist in old tablet, it is a new added column |
1487 | 0 | col_idx_to_unique_id[new_col_idx] = next_unique_id++; |
1488 | 0 | } |
1489 | 0 | } |
1490 | 0 | } |
1491 | 91 | VLOG_NOTICE << "creating tablet meta. next_unique_id=" << next_unique_id; |
1492 | | |
1493 | | // We generate a new tablet_uid for this new tablet. |
1494 | 91 | uint64_t shard_id = store->get_shard(); |
1495 | 91 | *tablet_meta = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id, |
1496 | 91 | col_idx_to_unique_id); |
1497 | 91 | if (request.__isset.storage_format) { |
1498 | 52 | if (request.storage_format == TStorageFormat::DEFAULT) { |
1499 | 0 | (*tablet_meta)->set_preferred_rowset_type(_engine.default_rowset_type()); |
1500 | 52 | } else if (request.storage_format == TStorageFormat::V1) { |
1501 | 0 | (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); |
1502 | 52 | } else if (request.storage_format == TStorageFormat::V2) { |
1503 | 52 | (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); |
1504 | 52 | } else { |
1505 | 0 | return Status::Error<CE_CMD_PARAMS_ERROR>("invalid TStorageFormat: {}", |
1506 | 0 | request.storage_format); |
1507 | 0 | } |
1508 | 52 | } |
1509 | 91 | return Status::OK(); |
1510 | 91 | } |
1511 | | |
1512 | 2.51k | TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { |
1513 | 2.51k | VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id; |
1514 | 2.51k | tablet_map_t& tablet_map = _get_tablet_map(tablet_id); |
1515 | 2.51k | const auto& iter = tablet_map.find(tablet_id); |
1516 | 2.51k | if (iter != tablet_map.end()) { |
1517 | 2.37k | return iter->second; |
1518 | 2.37k | } |
1519 | 141 | return nullptr; |
1520 | 2.51k | } |
1521 | | |
1522 | 93 | void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { |
1523 | 93 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1524 | 93 | auto& partition = _partitions[tablet->partition_id()]; |
1525 | 93 | partition.tablets.insert(tablet->get_tablet_info()); |
1526 | 93 | tablet->set_visible_version( |
1527 | 93 | std::static_pointer_cast<const VersionWithTime>(partition.visible_version)); |
1528 | 93 | } |
1529 | | |
1530 | 46 | void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { |
1531 | 46 | tablet->set_visible_version(nullptr); |
1532 | 46 | std::lock_guard<std::shared_mutex> wrlock(_partitions_lock); |
1533 | 46 | auto it = _partitions.find(tablet->partition_id()); |
1534 | 46 | if (it == _partitions.end()) { |
1535 | 0 | return; |
1536 | 0 | } |
1537 | | |
1538 | 46 | auto& tablets = it->second.tablets; |
1539 | 46 | tablets.erase(tablet->get_tablet_info()); |
1540 | 46 | if (tablets.empty()) { |
1541 | 26 | _partitions.erase(it); |
1542 | 26 | } |
1543 | 46 | } |
1544 | | |
1545 | | void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info, |
1546 | 0 | int64_t num) { |
1547 | 0 | for (const auto& tablets_shard : _tablets_shards) { |
1548 | 0 | std::shared_lock rdlock(tablets_shard.lock); |
1549 | 0 | for (const auto& item : tablets_shard.tablet_map) { |
1550 | 0 | TabletSharedPtr tablet = item.second; |
1551 | 0 | if (tablets_info.size() >= num) { |
1552 | 0 | return; |
1553 | 0 | } |
1554 | 0 | if (tablet == nullptr) { |
1555 | 0 | continue; |
1556 | 0 | } |
1557 | 0 | tablets_info.push_back(tablet->get_tablet_info()); |
1558 | 0 | } |
1559 | 0 | } |
1560 | 0 | } |
1561 | | |
1562 | 2.41k | std::shared_mutex& TabletManager::_get_tablets_shard_lock(TTabletId tabletId) { |
1563 | 2.41k | return _get_tablets_shard(tabletId).lock; |
1564 | 2.41k | } |
1565 | | |
1566 | 2.76k | TabletManager::tablet_map_t& TabletManager::_get_tablet_map(TTabletId tabletId) { |
1567 | 2.76k | return _get_tablets_shard(tabletId).tablet_map; |
1568 | 2.76k | } |
1569 | | |
1570 | 5.35k | TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tabletId) { |
1571 | 5.35k | return _tablets_shards[tabletId & _tablets_shards_mask]; |
1572 | 5.35k | } |
1573 | | |
1574 | | void TabletManager::get_tablets_distribution_on_different_disks( |
1575 | | std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk, |
1576 | 0 | std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk) { |
1577 | 0 | std::vector<DataDir*> data_dirs = _engine.get_stores(); |
1578 | 0 | std::map<int64_t, Partition> partitions; |
1579 | 0 | { |
1580 | | // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. |
1581 | | // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and |
1582 | | // copy _partitions here. |
1583 | 0 | std::shared_lock rdlock(_partitions_lock); |
1584 | 0 | partitions = _partitions; |
1585 | 0 | } |
1586 | |
|
1587 | 0 | for (const auto& [partition_id, partition] : partitions) { |
1588 | 0 | std::map<DataDir*, int64_t> tablets_num; |
1589 | 0 | std::map<DataDir*, std::vector<TabletSize>> tablets_info; |
1590 | 0 | for (auto* data_dir : data_dirs) { |
1591 | 0 | tablets_num[data_dir] = 0; |
1592 | 0 | } |
1593 | |
|
1594 | 0 | for (const auto& tablet_info : partition.tablets) { |
1595 | | // get_tablet() will hold 'tablet_shard_lock' |
1596 | 0 | TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); |
1597 | 0 | if (tablet == nullptr) { |
1598 | 0 | continue; |
1599 | 0 | } |
1600 | 0 | DataDir* data_dir = tablet->data_dir(); |
1601 | 0 | size_t tablet_footprint = tablet->tablet_footprint(); |
1602 | 0 | tablets_num[data_dir]++; |
1603 | 0 | TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint); |
1604 | 0 | tablets_info[data_dir].push_back(tablet_size); |
1605 | 0 | } |
1606 | 0 | tablets_num_on_disk[partition_id] = tablets_num; |
1607 | 0 | tablets_info_on_disk[partition_id] = tablets_info; |
1608 | 0 | } |
1609 | 0 | } |
1610 | | |
1611 | | struct SortCtx { |
1612 | | SortCtx(TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t cooldown_timestamp, |
1613 | | int64_t file_size) |
1614 | 0 | : tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {} |
1615 | | TabletSharedPtr tablet; |
1616 | | RowsetSharedPtr rowset; |
1617 | | // to ensure the tablet with -1 would always be greater than other |
1618 | | uint64_t cooldown_timestamp; |
1619 | | int64_t file_size; |
1620 | 0 | bool operator<(const SortCtx& other) const { |
1621 | 0 | if (this->cooldown_timestamp == other.cooldown_timestamp) { |
1622 | 0 | return this->file_size > other.file_size; |
1623 | 0 | } |
1624 | 0 | return this->cooldown_timestamp < other.cooldown_timestamp; |
1625 | 0 | } |
1626 | | }; |
1627 | | |
1628 | | void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets, |
1629 | | std::vector<RowsetSharedPtr>* rowsets, |
1630 | 0 | std::function<bool(const TabletSharedPtr&)> skip_tablet) { |
1631 | 0 | std::vector<SortCtx> sort_ctx_vec; |
1632 | 0 | std::vector<std::weak_ptr<Tablet>> candidates; |
1633 | 0 | for_each_tablet([&](const TabletSharedPtr& tablet) { candidates.emplace_back(tablet); }, |
1634 | 0 | filter_all_tablets); |
1635 | 0 | auto get_cooldown_tablet = [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) { |
1636 | 0 | const TabletSharedPtr& tablet = t.lock(); |
1637 | 0 | RowsetSharedPtr rowset = nullptr; |
1638 | 0 | if (UNLIKELY(nullptr == tablet)) { |
1639 | 0 | return; |
1640 | 0 | } |
1641 | 0 | int64_t cooldown_timestamp = -1; |
1642 | 0 | size_t file_size = -1; |
1643 | 0 | if (!skip_tablet(tablet) && |
1644 | 0 | (rowset = tablet->need_cooldown(&cooldown_timestamp, &file_size))) { |
1645 | 0 | sort_ctx_vec.emplace_back(tablet, rowset, cooldown_timestamp, file_size); |
1646 | 0 | } |
1647 | 0 | }; |
1648 | 0 | std::for_each(candidates.begin(), candidates.end(), get_cooldown_tablet); |
1649 | |
|
1650 | 0 | std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end()); |
1651 | |
|
1652 | 0 | for (SortCtx& ctx : sort_ctx_vec) { |
1653 | 0 | VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id(); |
1654 | 0 | tablets->push_back(std::move(ctx.tablet)); |
1655 | 0 | rowsets->push_back(std::move(ctx.rowset)); |
1656 | 0 | } |
1657 | 0 | } |
1658 | | |
1659 | 0 | void TabletManager::get_all_tablets_storage_format(TCheckStorageFormatResult* result) { |
1660 | 0 | DCHECK(result != nullptr); |
1661 | 0 | auto handler = [result](const TabletSharedPtr& tablet) { |
1662 | 0 | if (tablet->all_beta()) { |
1663 | 0 | result->v2_tablets.push_back(tablet->tablet_id()); |
1664 | 0 | } else { |
1665 | 0 | result->v1_tablets.push_back(tablet->tablet_id()); |
1666 | 0 | } |
1667 | 0 | }; |
1668 | |
|
1669 | 0 | for_each_tablet(handler, filter_all_tablets); |
1670 | 0 | result->__isset.v1_tablets = true; |
1671 | 0 | result->__isset.v2_tablets = true; |
1672 | 0 | } |
1673 | | |
1674 | 0 | std::set<int64_t> TabletManager::check_all_tablet_segment(bool repair) { |
1675 | 0 | std::set<int64_t> bad_tablets; |
1676 | 0 | std::map<int64_t, std::vector<int64_t>> repair_shard_bad_tablets; |
1677 | 0 | auto handler = [&](const TabletSharedPtr& tablet) { |
1678 | 0 | if (!tablet->check_all_rowset_segment()) { |
1679 | 0 | int64_t tablet_id = tablet->tablet_id(); |
1680 | 0 | bad_tablets.insert(tablet_id); |
1681 | 0 | if (repair) { |
1682 | 0 | repair_shard_bad_tablets[tablet_id & _tablets_shards_mask].push_back(tablet_id); |
1683 | 0 | } |
1684 | 0 | } |
1685 | 0 | }; |
1686 | 0 | for_each_tablet(handler, filter_all_tablets); |
1687 | |
|
1688 | 0 | for (const auto& [shard_index, shard_tablets] : repair_shard_bad_tablets) { |
1689 | 0 | auto& tablets_shard = _tablets_shards[shard_index]; |
1690 | 0 | auto& tablet_map = tablets_shard.tablet_map; |
1691 | 0 | std::lock_guard<std::shared_mutex> wrlock(tablets_shard.lock); |
1692 | 0 | for (auto tablet_id : shard_tablets) { |
1693 | 0 | auto it = tablet_map.find(tablet_id); |
1694 | 0 | if (it == tablet_map.end()) { |
1695 | 0 | bad_tablets.erase(tablet_id); |
1696 | 0 | LOG(WARNING) << "Bad tablet has be removed. tablet_id=" << tablet_id; |
1697 | 0 | } else { |
1698 | 0 | const auto& tablet = it->second; |
1699 | 0 | static_cast<void>(tablet->set_tablet_state(TABLET_SHUTDOWN)); |
1700 | 0 | tablet->save_meta(); |
1701 | 0 | { |
1702 | 0 | std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock( |
1703 | 0 | _shutdown_tablets_lock); |
1704 | 0 | _shutdown_tablets.push_back(tablet); |
1705 | 0 | } |
1706 | 0 | LOG(WARNING) << "There are some segments lost, set tablet to shutdown state." |
1707 | 0 | << "tablet_id=" << tablet->tablet_id() |
1708 | 0 | << ", tablet_path=" << tablet->tablet_path(); |
1709 | 0 | } |
1710 | 0 | } |
1711 | 0 | } |
1712 | |
|
1713 | 0 | return bad_tablets; |
1714 | 0 | } |
1715 | | |
1716 | | bool TabletManager::update_tablet_partition_id(::doris::TPartitionId partition_id, |
1717 | 0 | ::doris::TTabletId tablet_id) { |
1718 | 0 | std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id)); |
1719 | 0 | TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id); |
1720 | 0 | if (tablet == nullptr) { |
1721 | 0 | LOG(WARNING) << "get tablet err partition_id: " << partition_id |
1722 | 0 | << " tablet_id:" << tablet_id; |
1723 | 0 | return false; |
1724 | 0 | } |
1725 | 0 | _remove_tablet_from_partition(tablet); |
1726 | 0 | auto st = tablet->tablet_meta()->set_partition_id(partition_id); |
1727 | 0 | if (!st.ok()) { |
1728 | 0 | LOG(WARNING) << "set partition id err partition_id: " << partition_id |
1729 | 0 | << " tablet_id:" << tablet_id; |
1730 | 0 | return false; |
1731 | 0 | } |
1732 | 0 | _add_tablet_to_partition(tablet); |
1733 | 0 | return true; |
1734 | 0 | } |
1735 | | |
1736 | | } // end namespace doris |