be/src/storage/rowset/beta_rowset.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <ctype.h> |
22 | | #include <errno.h> |
23 | | #include <fmt/format.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <filesystem> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | #include <utility> |
30 | | |
31 | | #include "cloud/config.h" |
32 | | #include "common/config.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "cpp/sync_point.h" |
37 | | #include "io/fs/file_reader.h" |
38 | | #include "io/fs/file_system.h" |
39 | | #include "io/fs/local_file_system.h" |
40 | | #include "io/fs/path.h" |
41 | | #include "io/fs/remote_file_system.h" |
42 | | #include "storage/index/index_file_reader.h" |
43 | | #include "storage/index/inverted/inverted_index_cache.h" |
44 | | #include "storage/index/inverted/inverted_index_desc.h" |
45 | | #include "storage/olap_common.h" |
46 | | #include "storage/olap_define.h" |
47 | | #include "storage/rowset/beta_rowset.h" |
48 | | #include "storage/rowset/beta_rowset_reader.h" |
49 | | #include "storage/rowset/rowset.h" |
50 | | #include "storage/segment/segment_loader.h" |
51 | | #include "storage/tablet/tablet_schema.h" |
52 | | #include "storage/utils.h" |
53 | | #include "util/debug_points.h" |
54 | | |
55 | | namespace doris { |
56 | | using namespace ErrorCode; |
57 | | |
58 | | std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path, |
59 | | const RowsetId& rowset_id, int64_t begin, |
60 | 34 | int64_t end) { |
61 | | // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat |
62 | 34 | return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end); |
63 | 34 | } |
64 | | |
65 | | BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, |
66 | | std::string tablet_path) |
67 | 1.03M | : Rowset(schema, rowset_meta, std::move(tablet_path)) {} |
68 | | |
69 | 573k | BetaRowset::~BetaRowset() = default; |
70 | | |
71 | 1.02M | Status BetaRowset::init() { |
72 | 1.02M | return Status::OK(); // no op |
73 | 1.02M | } |
74 | | |
75 | | namespace { |
76 | | Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset, |
77 | | std::vector<uint32_t>* segment_rows, bool enable_segment_cache, |
78 | 35.0k | OlapReaderStatistics* read_stats) { |
79 | 35.0k | SegmentCacheHandle segment_cache_handle; |
80 | 35.0k | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
81 | 35.0k | rowset, &segment_cache_handle, enable_segment_cache, false, read_stats)); |
82 | 35.0k | for (const auto& segment : segment_cache_handle.get_segments()) { |
83 | 35.0k | segment_rows->emplace_back(segment->num_rows()); |
84 | 35.0k | } |
85 | 35.0k | return Status::OK(); |
86 | 35.0k | } |
87 | | |
88 | | Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta, |
89 | | const std::vector<uint32_t>& rows_from_footer, |
90 | 35.0k | int64_t tablet_id, const std::string& rowset_id) { |
91 | 35.0k | DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size()); |
92 | 70.2k | for (size_t i = 0; i < rows_from_footer.size(); i++) { |
93 | 35.1k | if (rows_from_footer[i] != rows_from_meta[i]) { |
94 | 0 | auto msg = fmt::format( |
95 | 0 | "segment rows mismatch between rowset meta and segment footer. " |
96 | 0 | "segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}", |
97 | 0 | i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id); |
98 | 0 | if (config::enable_segment_rows_check_core) { |
99 | 0 | CHECK(false) << msg; |
100 | 0 | } |
101 | 0 | return Status::InternalError(msg); |
102 | 0 | } |
103 | 35.1k | } |
104 | 35.0k | return Status::OK(); |
105 | 35.0k | } |
106 | | } // namespace |
107 | | |
108 | | Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows, |
109 | | bool enable_segment_cache, |
110 | 3.48M | OlapReaderStatistics* read_stats) { |
111 | 3.48M | #ifndef BE_TEST |
112 | | // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. |
113 | | // So here `ROWSET_UNLOADING` is allowed. |
114 | 3.48M | DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); |
115 | 3.48M | #endif |
116 | 3.48M | RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] { |
117 | 3.48M | auto segment_count = num_segments(); |
118 | 3.48M | if (segment_count == 0) { |
119 | 3.48M | return Status::OK(); |
120 | 3.48M | } |
121 | | |
122 | 3.48M | if (!_rowset_meta->get_num_segment_rows().empty()) { |
123 | 3.48M | if (_rowset_meta->get_num_segment_rows().size() == segment_count) { |
124 | | // use segment rows in rowset meta if eligible |
125 | 3.48M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta"); |
126 | 3.48M | _segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(), |
127 | 3.48M | _rowset_meta->get_num_segment_rows().cend()); |
128 | 3.48M | if (config::enable_segment_rows_consistency_check) { |
129 | | // verify segment rows from meta match segment footer |
130 | 3.48M | std::vector<uint32_t> rows_from_footer; |
131 | 3.48M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
132 | 3.48M | auto load_status = load_segment_rows_from_footer( |
133 | 3.48M | self, &rows_from_footer, enable_segment_cache, read_stats); |
134 | 3.48M | if (load_status.ok()) { |
135 | 3.48M | return check_segment_rows_consistency( |
136 | 3.48M | _segments_rows, rows_from_footer, _rowset_meta->tablet_id(), |
137 | 3.48M | _rowset_meta->rowset_id().to_string()); |
138 | 3.48M | } |
139 | 3.48M | } |
140 | 3.48M | return Status::OK(); |
141 | 3.48M | } else { |
142 | 3.48M | auto msg = fmt::format( |
143 | 3.48M | "[verbose] corrupted segment rows info in rowset meta. " |
144 | 3.48M | "segment count: {}, segment rows size: {}, tablet={}, rowset={}", |
145 | 3.48M | segment_count, _rowset_meta->get_num_segment_rows().size(), |
146 | 3.48M | _rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string()); |
147 | 3.48M | if (config::enable_segment_rows_check_core) { |
148 | 3.48M | CHECK(false) << msg; |
149 | 3.48M | } |
150 | 3.48M | LOG_EVERY_SECOND(WARNING) << msg; |
151 | 3.48M | } |
152 | 3.48M | } |
153 | 3.48M | if (config::fail_when_segment_rows_not_in_rowset_meta) { |
154 | 3.48M | CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet=" |
155 | 3.48M | << _rowset_meta->tablet_id() |
156 | 3.48M | << ", rowset=" << _rowset_meta->rowset_id().to_string() |
157 | 3.48M | << ", version=" << _rowset_meta->version() |
158 | 3.48M | << ", debug_string=" << _rowset_meta->debug_string() |
159 | 3.48M | << ", stack=" << Status::InternalError("error"); |
160 | 3.48M | } |
161 | | // otherwise, read it from segment footer |
162 | 3.48M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer"); |
163 | 3.48M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
164 | 3.48M | return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache, |
165 | 3.48M | read_stats); |
166 | 3.48M | })); |
167 | 3.48M | segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); |
168 | 3.48M | return Status::OK(); |
169 | 3.48M | } |
170 | | |
171 | 21 | Status BetaRowset::get_inverted_index_size(int64_t* index_size) { |
172 | 21 | const auto& fs = _rowset_meta->fs(); |
173 | 21 | if (!fs) { |
174 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
175 | 0 | _rowset_meta->resource_id()); |
176 | 0 | } |
177 | | |
178 | 21 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
179 | 7 | for (const auto& index : _schema->inverted_indexes()) { |
180 | 15 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
181 | 9 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
182 | 9 | int64_t file_size = 0; |
183 | | |
184 | 9 | std::string inverted_index_file_path = |
185 | 9 | InvertedIndexDescriptor::get_index_file_path_v1( |
186 | 9 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
187 | 9 | index->index_id(), index->get_index_suffix()); |
188 | 9 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
189 | 9 | *index_size += file_size; |
190 | 9 | } |
191 | 6 | } |
192 | 14 | } else { |
193 | 20 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
194 | 14 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
195 | 14 | int64_t file_size = 0; |
196 | | |
197 | 14 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
198 | 14 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
199 | 14 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
200 | 6 | *index_size += file_size; |
201 | 6 | } |
202 | 14 | } |
203 | 13 | return Status::OK(); |
204 | 21 | } |
205 | | |
206 | 216k | void BetaRowset::clear_inverted_index_cache() { |
207 | 301k | for (int i = 0; i < num_segments(); ++i) { |
208 | 84.3k | auto seg_path = segment_path(i); |
209 | 84.3k | if (!seg_path) { |
210 | 0 | continue; |
211 | 0 | } |
212 | | |
213 | 84.3k | auto index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path); |
214 | 805k | for (const auto& column : tablet_schema()->columns()) { |
215 | 805k | auto index_metas = tablet_schema()->inverted_indexs(*column); |
216 | 805k | for (const auto& index_meta : index_metas) { |
217 | 11.8k | auto inverted_index_file_cache_key = |
218 | 11.8k | InvertedIndexDescriptor::get_index_file_cache_key( |
219 | 11.8k | index_path_prefix, index_meta->index_id(), |
220 | 11.8k | index_meta->get_index_suffix()); |
221 | 11.8k | (void)segment_v2::InvertedIndexSearcherCache::instance()->erase( |
222 | 11.8k | inverted_index_file_cache_key); |
223 | 11.8k | } |
224 | 805k | } |
225 | 84.3k | } |
226 | 216k | } |
227 | | |
228 | 737 | Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) { |
229 | 737 | auto fs = _rowset_meta->fs(); |
230 | 737 | if (!fs) { |
231 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
232 | 0 | _rowset_meta->resource_id()); |
233 | 0 | } |
234 | | |
235 | 1.48k | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
236 | 745 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
237 | 745 | int64_t file_size; |
238 | 745 | RETURN_IF_ERROR(fs->file_size(seg_path, &file_size)); |
239 | 744 | segments_size->push_back(file_size); |
240 | 744 | } |
241 | 736 | return Status::OK(); |
242 | 737 | } |
243 | | |
244 | 181k | Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) { |
245 | 181k | return load_segments(0, num_segments(), segments); |
246 | 181k | } |
247 | | |
248 | | Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, |
249 | 214k | std::vector<segment_v2::SegmentSharedPtr>* segments) { |
250 | 214k | int64_t seg_id = seg_id_begin; |
251 | 363k | while (seg_id < seg_id_end) { |
252 | 148k | std::shared_ptr<segment_v2::Segment> segment; |
253 | 148k | RETURN_IF_ERROR(load_segment(seg_id, nullptr, &segment)); |
254 | 148k | segments->push_back(std::move(segment)); |
255 | 148k | seg_id++; |
256 | 148k | } |
257 | 214k | return Status::OK(); |
258 | 214k | } |
259 | | |
260 | | Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, |
261 | 245k | segment_v2::SegmentSharedPtr* segment) { |
262 | 245k | auto fs = _rowset_meta->fs(); |
263 | 245k | if (!fs) { |
264 | 1 | return Status::Error<INIT_FAILED>("get fs failed"); |
265 | 1 | } |
266 | | |
267 | 245k | DCHECK(seg_id >= 0); |
268 | 245k | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
269 | 245k | io::FileReaderOptions reader_options; |
270 | 245k | reader_options.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
271 | 245k | : io::FileCachePolicy::NO_CACHE; |
272 | 245k | reader_options.is_doris_table = true; |
273 | 245k | reader_options.file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)); |
274 | 245k | reader_options.tablet_id = _rowset_meta->tablet_id(); |
275 | 245k | reader_options.storage_resource_id = _rowset_meta->resource_id(); |
276 | | |
277 | 245k | auto s = segment_v2::Segment::open( |
278 | 245k | fs, seg_path, _rowset_meta->tablet_id(), static_cast<uint32_t>(seg_id), rowset_id(), |
279 | 245k | _schema, reader_options, segment, |
280 | 245k | _rowset_meta->inverted_index_file_info(static_cast<int>(seg_id)), stats); |
281 | 245k | if (!s.ok()) { |
282 | 5 | LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() |
283 | 5 | << " : " << s.to_string(); |
284 | 5 | return s; |
285 | 5 | } |
286 | 245k | return Status::OK(); |
287 | 245k | } |
288 | | |
289 | 5.84M | Status BetaRowset::create_reader(RowsetReaderSharedPtr* result) { |
290 | | // NOTE: We use std::static_pointer_cast for performance |
291 | 5.84M | result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()))); |
292 | 5.84M | return Status::OK(); |
293 | 5.84M | } |
294 | | |
295 | 37.6k | Status BetaRowset::remove() { |
296 | 37.6k | if (!is_local()) { |
297 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
298 | 0 | return Status::OK(); |
299 | 0 | } |
300 | | |
301 | | // TODO should we close and remove all segment reader first? |
302 | 37.6k | VLOG_NOTICE << "begin to remove files in rowset " << rowset_id() |
303 | 6 | << ", version:" << start_version() << "-" << end_version() |
304 | 6 | << ", tabletid:" << _rowset_meta->tablet_id(); |
305 | | // If the rowset was removed, it need to remove the fds in segment cache directly |
306 | 37.6k | clear_cache(); |
307 | | |
308 | 37.6k | bool success = true; |
309 | 37.6k | Status st; |
310 | 37.6k | const auto& fs = io::global_local_filesystem(); |
311 | 56.1k | for (int i = 0; i < num_segments(); ++i) { |
312 | 18.4k | auto seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
313 | 18.4k | LOG(INFO) << "deleting " << seg_path; |
314 | 18.4k | st = fs->delete_file(seg_path); |
315 | 18.4k | if (!st.ok()) { |
316 | 0 | LOG(WARNING) << st.to_string(); |
317 | 0 | success = false; |
318 | 0 | } |
319 | | |
320 | 18.4k | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
321 | 0 | for (const auto& column : _schema->columns()) { |
322 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
323 | 0 | for (const auto& index_meta : index_metas) { |
324 | 0 | std::string inverted_index_file = |
325 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
326 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
327 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
328 | 0 | st = fs->delete_file(inverted_index_file); |
329 | 0 | if (!st.ok()) { |
330 | 0 | LOG(WARNING) << st.to_string(); |
331 | 0 | success = false; |
332 | 0 | } |
333 | 0 | } |
334 | 0 | } |
335 | 18.4k | } else { |
336 | 18.4k | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
337 | 104 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
338 | 104 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
339 | 104 | st = fs->delete_file(inverted_index_file); |
340 | 104 | if (!st.ok()) { |
341 | 0 | LOG(WARNING) << st.to_string(); |
342 | 0 | success = false; |
343 | 0 | } |
344 | 104 | } |
345 | 18.4k | } |
346 | 18.4k | } |
347 | 37.6k | if (!success) { |
348 | 0 | return Status::Error<ROWSET_DELETE_FILE_FAILED>("failed to remove files in rowset {}", |
349 | 0 | rowset_id().to_string()); |
350 | 0 | } |
351 | 37.6k | return Status::OK(); |
352 | 37.6k | } |
353 | | |
354 | 32.2k | void BetaRowset::do_close() { |
355 | | // do nothing. |
356 | 32.2k | } |
357 | | |
358 | | Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, |
359 | | size_t new_rowset_start_seg_id, |
360 | 3.95k | std::set<int64_t>* without_index_uids) { |
361 | 3.95k | if (!is_local()) { |
362 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
363 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
364 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
365 | 0 | } |
366 | | |
367 | 3.95k | const auto& local_fs = io::global_local_filesystem(); |
368 | 3.95k | Status status; |
369 | 3.95k | std::vector<std::string> linked_success_files; |
370 | 3.95k | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
371 | 3.95k | if (!status.ok()) { |
372 | 0 | LOG(WARNING) << "will delete linked success files due to error " << status; |
373 | 0 | std::vector<io::Path> paths; |
374 | 0 | for (auto& file : linked_success_files) { |
375 | 0 | paths.emplace_back(file); |
376 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
377 | 0 | } |
378 | 0 | static_cast<void>(local_fs->batch_delete(paths)); |
379 | 0 | LOG(WARNING) << "done delete linked success files due to error " << status; |
380 | 0 | } |
381 | 3.95k | }}; |
382 | | |
383 | 4.08k | for (int i = 0; i < num_segments(); ++i) { |
384 | 129 | auto dst_path = |
385 | 129 | local_segment_path(dir, new_rowset_id.to_string(), i + new_rowset_start_seg_id); |
386 | 129 | bool dst_path_exist = false; |
387 | 129 | if (!local_fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { |
388 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
389 | 0 | "failed to create hard link, file already exist: {}", dst_path); |
390 | 0 | return status; |
391 | 0 | } |
392 | 129 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
393 | | // TODO(lingbin): how external storage support link? |
394 | | // use copy? or keep refcount to avoid being delete? |
395 | 129 | if (!local_fs->link_file(src_path, dst_path).ok()) { |
396 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
397 | 0 | src_path, dst_path, Errno::no()); |
398 | 0 | return status; |
399 | 0 | } |
400 | 129 | linked_success_files.push_back(dst_path); |
401 | 129 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
402 | 129 | status = Status::Error<OS_ERROR>("fault_inject link_file error"); |
403 | 129 | return status; |
404 | 129 | }); |
405 | 129 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
406 | 40 | for (const auto& index : _schema->inverted_indexes()) { |
407 | 9 | auto index_id = index->index_id(); |
408 | 9 | if (without_index_uids != nullptr && without_index_uids->count(index_id)) { |
409 | 1 | continue; |
410 | 1 | } |
411 | 8 | std::string inverted_index_src_file_path = |
412 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
413 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
414 | 8 | index_id, index->get_index_suffix()); |
415 | 8 | std::string inverted_index_dst_file_path = |
416 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
417 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
418 | 8 | index_id, index->get_index_suffix()); |
419 | 8 | bool index_file_exists = true; |
420 | 8 | RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists)); |
421 | 8 | if (index_file_exists) { |
422 | 8 | DBUG_EXECUTE_IF( |
423 | 8 | "fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
424 | 8 | status = Status::Error<OS_ERROR>( |
425 | 8 | "fault_inject link_file error from={}, to={}", |
426 | 8 | inverted_index_src_file_path, inverted_index_dst_file_path); |
427 | 8 | return status; |
428 | 8 | }); |
429 | 8 | if (!local_fs->link_file(inverted_index_src_file_path, |
430 | 8 | inverted_index_dst_file_path) |
431 | 8 | .ok()) { |
432 | 0 | status = Status::Error<OS_ERROR>( |
433 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
434 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path, |
435 | 0 | Errno::no()); |
436 | 0 | return status; |
437 | 0 | } |
438 | 8 | linked_success_files.push_back(inverted_index_dst_file_path); |
439 | 8 | LOG(INFO) << "success to create hard link. from=" |
440 | 8 | << inverted_index_src_file_path << ", " |
441 | 8 | << "to=" << inverted_index_dst_file_path; |
442 | 8 | } else { |
443 | 0 | LOG(WARNING) << "skip create hard link to not existed index file=" |
444 | 0 | << inverted_index_src_file_path; |
445 | 0 | } |
446 | 8 | } |
447 | 89 | } else { |
448 | 89 | if ((_schema->has_inverted_index() || _schema->has_ann_index()) && |
449 | 89 | (without_index_uids == nullptr || without_index_uids->empty())) { |
450 | 0 | std::string inverted_index_file_src = |
451 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
452 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
453 | 0 | std::string inverted_index_file_dst = |
454 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
455 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
456 | 0 | bool index_dst_path_exist = false; |
457 | |
|
458 | 0 | if (!local_fs->exists(inverted_index_file_dst, &index_dst_path_exist).ok() || |
459 | 0 | index_dst_path_exist) { |
460 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
461 | 0 | "failed to create hard link, file already exist: {}", |
462 | 0 | inverted_index_file_dst); |
463 | 0 | return status; |
464 | 0 | } |
465 | 0 | if (!local_fs->link_file(inverted_index_file_src, inverted_index_file_dst).ok()) { |
466 | 0 | status = Status::Error<OS_ERROR>( |
467 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
468 | 0 | inverted_index_file_src, inverted_index_file_dst, Errno::no()); |
469 | 0 | return status; |
470 | 0 | } |
471 | 0 | linked_success_files.push_back(inverted_index_file_dst); |
472 | 0 | } |
473 | 89 | } |
474 | 129 | } |
475 | 3.95k | return Status::OK(); |
476 | 3.95k | } |
477 | | |
478 | 4 | Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) { |
479 | 4 | if (!is_local()) { |
480 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
481 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
482 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
483 | 0 | } |
484 | | |
485 | 4 | bool exists = false; |
486 | 4 | for (int i = 0; i < num_segments(); ++i) { |
487 | 0 | auto dst_path = local_segment_path(dir, new_rowset_id.to_string(), i); |
488 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(dst_path, &exists)); |
489 | 0 | if (exists) { |
490 | 0 | return Status::Error<FILE_ALREADY_EXIST>("file already exist: {}", dst_path); |
491 | 0 | } |
492 | 0 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
493 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(src_path, dst_path)); |
494 | 0 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
495 | 0 | for (const auto& column : _schema->columns()) { |
496 | | // if (column.has_inverted_index()) { |
497 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
498 | 0 | for (const auto& index_meta : index_metas) { |
499 | 0 | std::string inverted_index_src_file_path = |
500 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
501 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
502 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
503 | 0 | std::string inverted_index_dst_file_path = |
504 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
505 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
506 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
507 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path( |
508 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path)); |
509 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path |
510 | 0 | << ", " |
511 | 0 | << "to=" << inverted_index_dst_file_path; |
512 | 0 | } |
513 | 0 | } |
514 | 0 | } else { |
515 | 0 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
516 | 0 | std::string inverted_index_src_file = |
517 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
518 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
519 | 0 | std::string inverted_index_dst_file = |
520 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
521 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
522 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(inverted_index_src_file, |
523 | 0 | inverted_index_dst_file)); |
524 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file << ", " |
525 | 0 | << "to=" << inverted_index_dst_file; |
526 | 0 | } |
527 | 0 | } |
528 | 0 | } |
529 | 4 | return Status::OK(); |
530 | 4 | } |
531 | | |
532 | 5 | Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) { |
533 | 5 | if (!is_local()) { |
534 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
535 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
536 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
537 | 0 | } |
538 | | |
539 | 5 | if (num_segments() < 1) { |
540 | 3 | return Status::OK(); |
541 | 3 | } |
542 | 2 | std::vector<io::Path> local_paths; |
543 | 2 | local_paths.reserve(num_segments()); |
544 | 2 | std::vector<io::Path> dest_paths; |
545 | 2 | dest_paths.reserve(num_segments()); |
546 | 4 | for (int i = 0; i < num_segments(); ++i) { |
547 | | // Note: Here we use relative path for remote. |
548 | 2 | auto remote_seg_path = dest_fs.remote_segment_path(_rowset_meta->tablet_id(), |
549 | 2 | new_rowset_id.to_string(), i); |
550 | 2 | auto local_seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
551 | 2 | dest_paths.emplace_back(remote_seg_path); |
552 | 2 | local_paths.emplace_back(local_seg_path); |
553 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
554 | 0 | for (const auto& column : _schema->columns()) { |
555 | | // if (column.has_inverted_index()) { |
556 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
557 | 0 | for (const auto& index_meta : index_metas) { |
558 | 0 | std::string remote_inverted_index_file = |
559 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
560 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
561 | 0 | remote_seg_path), |
562 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
563 | 0 | std::string local_inverted_index_file = |
564 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
565 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
566 | 0 | local_seg_path), |
567 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
568 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
569 | 0 | local_paths.emplace_back(local_inverted_index_file); |
570 | 0 | } |
571 | 0 | } |
572 | 2 | } else { |
573 | 2 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
574 | 0 | std::string remote_inverted_index_file = |
575 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
576 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
577 | 0 | remote_seg_path)); |
578 | 0 | std::string local_inverted_index_file = |
579 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
580 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
581 | 0 | local_seg_path)); |
582 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
583 | 0 | local_paths.emplace_back(local_inverted_index_file); |
584 | 0 | } |
585 | 2 | } |
586 | 2 | } |
587 | 2 | auto st = dest_fs.fs->batch_upload(local_paths, dest_paths); |
588 | 2 | if (st.ok()) { |
589 | 2 | DorisMetrics::instance()->upload_rowset_count->increment(1); |
590 | 2 | DorisMetrics::instance()->upload_total_byte->increment(total_disk_size()); |
591 | 2 | } else { |
592 | 0 | DorisMetrics::instance()->upload_fail_count->increment(1); |
593 | 0 | } |
594 | 2 | return st; |
595 | 5 | } |
596 | | |
597 | 0 | Status BetaRowset::check_file_exist() { |
598 | 0 | const auto& fs = _rowset_meta->fs(); |
599 | 0 | if (!fs) { |
600 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
601 | 0 | _rowset_meta->resource_id()); |
602 | 0 | } |
603 | | |
604 | 0 | for (int i = 0; i < num_segments(); ++i) { |
605 | 0 | auto seg_path = DORIS_TRY(segment_path(i)); |
606 | 0 | bool seg_file_exist = false; |
607 | 0 | RETURN_IF_ERROR(fs->exists(seg_path, &seg_file_exist)); |
608 | 0 | if (!seg_file_exist) { |
609 | 0 | return Status::InternalError("data file not existed: {}, rowset_id={}", seg_path, |
610 | 0 | rowset_id().to_string()); |
611 | 0 | } |
612 | 0 | } |
613 | | |
614 | 0 | return Status::OK(); |
615 | 0 | } |
616 | | |
617 | 0 | Status BetaRowset::check_current_rowset_segment() { |
618 | 0 | const auto& fs = _rowset_meta->fs(); |
619 | 0 | if (!fs) { |
620 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
621 | 0 | _rowset_meta->resource_id()); |
622 | 0 | } |
623 | | |
624 | 0 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
625 | 0 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
626 | |
|
627 | 0 | std::shared_ptr<segment_v2::Segment> segment; |
628 | 0 | io::FileReaderOptions reader_options; |
629 | 0 | reader_options.cache_type = config::enable_file_cache |
630 | 0 | ? io::FileCachePolicy::FILE_BLOCK_CACHE |
631 | 0 | : io::FileCachePolicy::NO_CACHE; |
632 | 0 | reader_options.is_doris_table = true; |
633 | 0 | reader_options.file_size = _rowset_meta->segment_file_size(seg_id); |
634 | 0 | reader_options.tablet_id = _rowset_meta->tablet_id(); |
635 | 0 | reader_options.storage_resource_id = _rowset_meta->resource_id(); |
636 | |
|
637 | 0 | auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, |
638 | 0 | rowset_id(), _schema, reader_options, &segment, |
639 | 0 | _rowset_meta->inverted_index_file_info(seg_id)); |
640 | 0 | if (!s.ok()) { |
641 | 0 | LOG(WARNING) << "segment can not be opened. file=" << seg_path; |
642 | 0 | return s; |
643 | 0 | } |
644 | 0 | } |
645 | | |
646 | 0 | return Status::OK(); |
647 | 0 | } |
648 | | |
649 | 4 | Status BetaRowset::add_to_binlog() { |
650 | | // FIXME(Drogon): not only local file system |
651 | 4 | if (!is_local()) { |
652 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
653 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
654 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
655 | 0 | } |
656 | | |
657 | 4 | const auto& fs = io::global_local_filesystem(); |
658 | 4 | auto segments_num = num_segments(); |
659 | 4 | VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}", |
660 | 0 | rowset_id().to_string(), segments_num); |
661 | | |
662 | 4 | Status status; |
663 | 4 | std::vector<std::string> linked_success_files; |
664 | 4 | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
665 | 4 | if (!status.ok()) { |
666 | 0 | LOG(WARNING) << "will delete linked success files due to error " |
667 | 0 | << status.to_string_no_stack(); |
668 | 0 | std::vector<io::Path> paths; |
669 | 0 | for (auto& file : linked_success_files) { |
670 | 0 | paths.emplace_back(file); |
671 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
672 | 0 | } |
673 | 0 | static_cast<void>(fs->batch_delete(paths)); |
674 | 0 | LOG(WARNING) << "done delete linked success files due to error " |
675 | 0 | << status.to_string_no_stack(); |
676 | 0 | } |
677 | 4 | }}; |
678 | | |
679 | | // The publish_txn might fail even if the add_to_binlog success, so we need to check |
680 | | // whether a file already exists before linking. |
681 | 4 | auto errno_is_file_exists = []() { return Errno::no() == EEXIST; }; |
682 | | |
683 | | // all segments are in the same directory, so cache binlog_dir without multi times check |
684 | 4 | std::string binlog_dir; |
685 | 6 | for (int i = 0; i < segments_num; ++i) { |
686 | 2 | auto seg_file = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
687 | | |
688 | 2 | if (binlog_dir.empty()) { |
689 | 2 | binlog_dir = std::filesystem::path(seg_file).parent_path().append("_binlog").string(); |
690 | | |
691 | 2 | bool exists = true; |
692 | 2 | RETURN_IF_ERROR(fs->exists(binlog_dir, &exists)); |
693 | 2 | if (!exists) { |
694 | 2 | RETURN_IF_ERROR(fs->create_directory(binlog_dir)); |
695 | 2 | } |
696 | 2 | } |
697 | | |
698 | 2 | auto binlog_file = |
699 | 2 | (std::filesystem::path(binlog_dir) / std::filesystem::path(seg_file).filename()) |
700 | 2 | .string(); |
701 | 2 | VLOG_DEBUG << "link " << seg_file << " to " << binlog_file; |
702 | 2 | if (!fs->link_file(seg_file, binlog_file).ok() && !errno_is_file_exists()) { |
703 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
704 | 0 | seg_file, binlog_file, Errno::no()); |
705 | 0 | return status; |
706 | 0 | } |
707 | 2 | linked_success_files.push_back(binlog_file); |
708 | | |
709 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
710 | 2 | for (const auto& index : _schema->inverted_indexes()) { |
711 | 2 | auto index_id = index->index_id(); |
712 | 2 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( |
713 | 2 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file), index_id, |
714 | 2 | index->get_index_suffix()); |
715 | 2 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
716 | 2 | std::filesystem::path(index_file).filename()) |
717 | 2 | .string(); |
718 | 2 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
719 | 2 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
720 | 0 | status = Status::Error<OS_ERROR>( |
721 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
722 | 0 | binlog_index_file, Errno::no()); |
723 | 0 | return status; |
724 | 0 | } |
725 | 2 | linked_success_files.push_back(binlog_index_file); |
726 | 2 | } |
727 | 1 | } else { |
728 | 1 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
729 | 1 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
730 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file)); |
731 | 1 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
732 | 1 | std::filesystem::path(index_file).filename()) |
733 | 1 | .string(); |
734 | 1 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
735 | 1 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
736 | 0 | status = Status::Error<OS_ERROR>( |
737 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
738 | 0 | binlog_index_file, Errno::no()); |
739 | 0 | return status; |
740 | 0 | } |
741 | 1 | linked_success_files.push_back(binlog_index_file); |
742 | 1 | } |
743 | 1 | } |
744 | 2 | } |
745 | | |
746 | 4 | return Status::OK(); |
747 | 4 | } |
748 | | |
749 | 10 | Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) { |
750 | 10 | const auto& fs = _rowset_meta->fs(); |
751 | 10 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc", |
752 | 10 | { return Status::Error<OS_ERROR>("fault_inject calc_file_crc error"); }); |
753 | 10 | if (num_segments() < 1) { |
754 | 2 | *crc_value = 0x92a8fc17; // magic code from crc32c table |
755 | 2 | return Status::OK(); |
756 | 2 | } |
757 | | |
758 | | // 1. pick up all the files including dat file and idx file |
759 | 8 | std::vector<io::Path> file_paths; |
760 | 16 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
761 | 8 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
762 | 8 | file_paths.emplace_back(seg_path); |
763 | 8 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
764 | 0 | for (const auto& column : _schema->columns()) { |
765 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
766 | 0 | for (const auto& index_meta : index_metas) { |
767 | 0 | std::string inverted_index_file = |
768 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
769 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
770 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
771 | 0 | file_paths.emplace_back(std::move(inverted_index_file)); |
772 | 0 | } |
773 | 0 | } |
774 | 8 | } else { |
775 | 8 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
776 | 8 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
777 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
778 | 8 | file_paths.emplace_back(std::move(inverted_index_file)); |
779 | 8 | } |
780 | 8 | } |
781 | 8 | } |
782 | 8 | *crc_value = 0; |
783 | 8 | *file_count = file_paths.size(); |
784 | 8 | if (!is_local()) { |
785 | 8 | return Status::OK(); |
786 | 8 | } |
787 | | |
788 | | // 2. calculate the md5sum of each file |
789 | 0 | const auto& local_fs = io::global_local_filesystem(); |
790 | 0 | DCHECK(!file_paths.empty()); |
791 | 0 | std::vector<std::string> all_file_md5; |
792 | 0 | all_file_md5.reserve(file_paths.size()); |
793 | 0 | for (const auto& file_path : file_paths) { |
794 | 0 | std::string file_md5sum; |
795 | 0 | auto status = local_fs->md5sum(file_path, &file_md5sum); |
796 | 0 | if (!status.ok()) { |
797 | 0 | return status; |
798 | 0 | } |
799 | 0 | VLOG_CRITICAL << fmt::format("calc file_md5sum finished. file_path={}, md5sum={}", |
800 | 0 | file_path.string(), file_md5sum); |
801 | 0 | all_file_md5.emplace_back(std::move(file_md5sum)); |
802 | 0 | } |
803 | 0 | std::sort(all_file_md5.begin(), all_file_md5.end()); |
804 | | |
805 | | // 3. calculate the crc_value based on all_file_md5 |
806 | 0 | DCHECK(file_paths.size() == all_file_md5.size()); |
807 | 0 | for (auto& i : all_file_md5) { |
808 | 0 | *crc_value = crc32c::Extend(*crc_value, (const uint8_t*)i.data(), i.size()); |
809 | 0 | } |
810 | |
|
811 | 0 | return Status::OK(); |
812 | 0 | } |
813 | | |
814 | | Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value, |
815 | 147 | rapidjson::Document::AllocatorType& allocator) { |
816 | 147 | const auto& fs = _rowset_meta->fs(); |
817 | 147 | auto storage_format = _schema->get_inverted_index_storage_format(); |
818 | 147 | std::string format_str; |
819 | 147 | switch (storage_format) { |
820 | 7 | case InvertedIndexStorageFormatPB::V1: |
821 | 7 | format_str = "V1"; |
822 | 7 | break; |
823 | 71 | case InvertedIndexStorageFormatPB::V2: |
824 | 71 | format_str = "V2"; |
825 | 71 | break; |
826 | 69 | case InvertedIndexStorageFormatPB::V3: |
827 | 69 | format_str = "V3"; |
828 | 69 | break; |
829 | 0 | default: |
830 | 0 | return Status::InternalError("inverted index storage format error"); |
831 | 0 | break; |
832 | 147 | } |
833 | 147 | auto rs_id = rowset_id().to_string(); |
834 | 147 | rowset_value->AddMember("rowset_id", rapidjson::Value(rs_id.c_str(), allocator), allocator); |
835 | 147 | rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str.c_str(), allocator), |
836 | 147 | allocator); |
837 | 147 | rapidjson::Value segments(rapidjson::kArrayType); |
838 | 221 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
839 | 75 | rapidjson::Value segment(rapidjson::kObjectType); |
840 | 75 | segment.AddMember("segment_id", rapidjson::Value(seg_id).Move(), allocator); |
841 | | |
842 | 75 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
843 | 75 | auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); |
844 | 75 | auto index_file_reader = std::make_unique<IndexFileReader>( |
845 | 75 | fs, std::string(index_file_path_prefix), storage_format, InvertedIndexFileInfo(), |
846 | 75 | _rowset_meta->tablet_id()); |
847 | 75 | RETURN_IF_ERROR(index_file_reader->init()); |
848 | 74 | auto dirs = index_file_reader->get_all_directories(); |
849 | | |
850 | 74 | auto add_file_info_to_json = [&](const std::string& path, |
851 | 80 | rapidjson::Value& json_value) -> Status { |
852 | 80 | json_value.AddMember("idx_file_path", rapidjson::Value(path.c_str(), allocator), |
853 | 80 | allocator); |
854 | 80 | int64_t idx_file_size = 0; |
855 | 80 | auto st = fs->file_size(path, &idx_file_size); |
856 | 80 | if (st != Status::OK()) { |
857 | 0 | LOG(WARNING) << "show nested index file get file size error, file: " << path |
858 | 0 | << ", error: " << st.msg(); |
859 | 0 | return st; |
860 | 0 | } |
861 | 80 | json_value.AddMember("idx_file_size", rapidjson::Value(idx_file_size).Move(), |
862 | 80 | allocator); |
863 | 80 | return Status::OK(); |
864 | 80 | }; |
865 | | |
866 | 74 | auto process_files = [&allocator, &index_file_reader](auto& index_meta, |
867 | 74 | rapidjson::Value& indices, |
868 | 140 | rapidjson::Value& index) -> Status { |
869 | 140 | rapidjson::Value files_value(rapidjson::kArrayType); |
870 | 140 | std::vector<std::string> files; |
871 | 140 | auto ret = index_file_reader->open(&index_meta); |
872 | 140 | if (!ret.has_value()) { |
873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); |
874 | 0 | return Status::InternalError("IndexFileReader open error"); |
875 | 0 | } |
876 | 140 | using T = std::decay_t<decltype(ret)>; |
877 | 140 | auto reader = std::forward<T>(ret).value(); |
878 | 140 | reader->list(&files); |
879 | 860 | for (auto& file : files) { |
880 | 860 | rapidjson::Value file_value(rapidjson::kObjectType); |
881 | 860 | auto size = reader->fileLength(file.c_str()); |
882 | 860 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); |
883 | 860 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); |
884 | 860 | files_value.PushBack(file_value, allocator); |
885 | 860 | } |
886 | 140 | index.AddMember("files", files_value, allocator); |
887 | 140 | indices.PushBack(index, allocator); |
888 | 140 | return Status::OK(); |
889 | 140 | }; beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clINS_11TabletIndexEEENS_6StatusERT_RS8_SH_ Line | Count | Source | 868 | 128 | rapidjson::Value& index) -> Status { | 869 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); | 870 | 128 | std::vector<std::string> files; | 871 | 128 | auto ret = index_file_reader->open(&index_meta); | 872 | 128 | if (!ret.has_value()) { | 873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 874 | 0 | return Status::InternalError("IndexFileReader open error"); | 875 | 0 | } | 876 | 128 | using T = std::decay_t<decltype(ret)>; | 877 | 128 | auto reader = std::forward<T>(ret).value(); | 878 | 128 | reader->list(&files); | 879 | 788 | for (auto& file : files) { | 880 | 788 | rapidjson::Value file_value(rapidjson::kObjectType); | 881 | 788 | auto size = reader->fileLength(file.c_str()); | 882 | 788 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 883 | 788 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 884 | 788 | files_value.PushBack(file_value, allocator); | 885 | 788 | } | 886 | 128 | index.AddMember("files", files_value, allocator); | 887 | 128 | indices.PushBack(index, allocator); | 888 | 128 | return Status::OK(); | 889 | 128 | }; |
beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clIKNS_11TabletIndexEEENS_6StatusERT_RS8_SI_ Line | Count | Source | 868 | 12 | rapidjson::Value& index) -> Status { | 869 | 12 | rapidjson::Value files_value(rapidjson::kArrayType); | 870 | 12 | std::vector<std::string> files; | 871 | 12 | auto ret = index_file_reader->open(&index_meta); | 872 | 12 | if (!ret.has_value()) { | 873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 874 | 0 | return Status::InternalError("IndexFileReader open error"); | 875 | 0 | } | 876 | 12 | using T = std::decay_t<decltype(ret)>; | 877 | 12 | auto reader = std::forward<T>(ret).value(); | 878 | 12 | reader->list(&files); | 879 | 72 | for (auto& file : files) { | 880 | 72 | rapidjson::Value file_value(rapidjson::kObjectType); | 881 | 72 | auto size = reader->fileLength(file.c_str()); | 882 | 72 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 883 | 72 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 884 | 72 | files_value.PushBack(file_value, allocator); | 885 | 72 | } | 886 | 12 | index.AddMember("files", files_value, allocator); | 887 | 12 | indices.PushBack(index, allocator); | 888 | 12 | return Status::OK(); | 889 | 12 | }; |
|
890 | | |
891 | 74 | if (storage_format != InvertedIndexStorageFormatPB::V1) { |
892 | 68 | auto path = InvertedIndexDescriptor::get_index_file_path_v2(index_file_path_prefix); |
893 | 68 | auto st = add_file_info_to_json(path, segment); |
894 | 68 | if (!st.ok()) { |
895 | 0 | return st; |
896 | 0 | } |
897 | 68 | rapidjson::Value indices(rapidjson::kArrayType); |
898 | 128 | for (auto& dir : *dirs) { |
899 | 128 | rapidjson::Value index(rapidjson::kObjectType); |
900 | 128 | auto index_id = dir.first.first; |
901 | 128 | auto index_suffix = dir.first.second; |
902 | 128 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
903 | 128 | index.AddMember("index_suffix", rapidjson::Value(index_suffix.c_str(), allocator), |
904 | 128 | allocator); |
905 | | |
906 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); |
907 | 128 | std::vector<std::string> files; |
908 | 128 | doris::TabletIndexPB index_pb; |
909 | 128 | index_pb.set_index_id(index_id); |
910 | 128 | index_pb.set_index_suffix_name(index_suffix); |
911 | 128 | TabletIndex index_meta; |
912 | 128 | index_meta.init_from_pb(index_pb); |
913 | | |
914 | 128 | auto status = process_files(index_meta, indices, index); |
915 | 128 | if (!status.ok()) { |
916 | 0 | return status; |
917 | 0 | } |
918 | 128 | } |
919 | 68 | segment.AddMember("indices", indices, allocator); |
920 | 68 | segments.PushBack(segment, allocator); |
921 | 68 | } else { |
922 | 6 | rapidjson::Value indices(rapidjson::kArrayType); |
923 | 18 | for (auto column : _rowset_meta->tablet_schema()->columns()) { |
924 | 18 | auto index_metas = _rowset_meta->tablet_schema()->inverted_indexs(*column); |
925 | 18 | for (const auto& index_meta : index_metas) { |
926 | 12 | rapidjson::Value index(rapidjson::kObjectType); |
927 | 12 | auto index_id = index_meta->index_id(); |
928 | 12 | auto index_suffix = index_meta->get_index_suffix(); |
929 | 12 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
930 | 12 | index.AddMember("index_suffix", |
931 | 12 | rapidjson::Value(index_suffix.c_str(), allocator), allocator); |
932 | 12 | auto path = InvertedIndexDescriptor::get_index_file_path_v1( |
933 | 12 | index_file_path_prefix, index_id, index_suffix); |
934 | 12 | RETURN_IF_ERROR(add_file_info_to_json(path, index)); |
935 | 12 | RETURN_IF_ERROR(process_files(*index_meta, indices, index)); |
936 | 12 | } |
937 | 18 | } |
938 | 6 | segment.AddMember("indices", indices, allocator); |
939 | 6 | segments.PushBack(segment, allocator); |
940 | 6 | } |
941 | 74 | } |
942 | 146 | rowset_value->AddMember("segments", segments, allocator); |
943 | 146 | return Status::OK(); |
944 | 147 | } |
945 | | } // namespace doris |